File Coverage

blib/lib/AnyEvent/Stomper.pm
Criterion Covered Total %
statement 188 502 37.4
branch 33 168 19.6
condition 15 54 27.7
subroutine 39 70 55.7
pod 4 4 100.0
total 279 798 34.9


line stmt bran cond sub pod time code
1             package AnyEvent::Stomper;
2              
3 4     4   57877 use 5.008000;
  4         9  
4 4     4   15 use strict;
  4         4  
  4         70  
5 4     4   11 use warnings;
  4         11  
  4         95  
6 4     4   12 use base qw( Exporter );
  4         3  
  4         364  
7              
8             our $VERSION = '0.34';
9              
10 4     4   1165 use AnyEvent::Stomper::Frame;
  4         5  
  4         125  
11 4     4   1203 use AnyEvent::Stomper::Error;
  4         6  
  4         83  
12              
13 4     4   3081 use AnyEvent;
  4         15862  
  4         107  
14 4     4   2271 use AnyEvent::Handle;
  4         55600  
  4         138  
15 4     4   24 use Scalar::Util qw( looks_like_number weaken );
  4         8  
  4         283  
16 4     4   16 use List::Util qw( max );
  4         7  
  4         346  
17 4     4   1811 use List::MoreUtils qw( bsearch_index );
  4         25197  
  4         32  
18 4     4   1825 use Carp qw( croak );
  4         6  
  4         292  
19              
20             my %ERROR_CODES;
21              
22             BEGIN {
23 4     4   26 %ERROR_CODES = %AnyEvent::Stomper::Error::ERROR_CODES;
24 4         14 our @EXPORT_OK = keys %ERROR_CODES;
25 4         208 our %EXPORT_TAGS = ( err_codes => \@EXPORT_OK );
26             }
27              
28             use constant {
29             # Default values
30 4         1961 D_HOST => 'localhost',
31             D_PORT => 61613,
32             D_HEARTBEAT => [ 0, 0 ],
33              
34             %ERROR_CODES,
35              
36             # Operation status
37             S_NEED_DO => 1,
38             S_IN_PROGRESS => 2,
39             S_DONE => 3,
40              
41             EOL => "\n",
42             RE_EOL => qr/\r?\n/,
43 4     4   18 };
  4         4  
44              
45             my %SUBUNSUB_CMDS = (
46             SUBSCRIBE => 1,
47             UNSUBSCRIBE => 1,
48             );
49              
50             my %ACK_CMDS = (
51             ACK => 1,
52             NACK => 1,
53             );
54              
55             my %NEED_RECEIPT = (
56             CONNECT => 1,
57             DISCONNECT => 1,
58             %SUBUNSUB_CMDS,
59             );
60              
61             my %ESCAPE_MAP = (
62             "\r" => "\\r",
63             "\n" => "\\n",
64             ':' => "\\c",
65             "\\" => "\\\\",
66             );
67             my %UNESCAPE_MAP = reverse %ESCAPE_MAP;
68              
69             my $RECEIPT_SEQ = 1;
70             my $MESSAGE_SEQ = 1;
71              
72              
73             sub new {
74 15     15 1 2598 my $class = shift;
75 15         29 my %params = @_;
76              
77 15         25 my $self = bless {}, $class;
78              
79 15   100     77 $self->{host} = $params{host} || D_HOST;
80 15   100     81 $self->{port} = $params{port} || D_PORT;
81 15         16 $self->{login} = $params{login};
82 15         17 $self->{passcode} = $params{passcode};
83 15         14 $self->{vhost} = $params{vhost};
84 15         15 $self->{lazy} = $params{lazy};
85 15   50     48 $self->{handle_params} = $params{handle_params} || {};
86 15   50     57 $self->{default_headers} = $params{default_headers} || {};
87 15         20 $self->{on_connect} = $params{on_connect};
88 15         18 $self->{on_disconnect} = $params{on_disconnect};
89              
90 15 100       23 if ( defined $params{heartbeat} ) {
91 2 100       6 unless ( ref( $params{heartbeat} ) eq 'ARRAY' ) {
92 1         188 croak q{"heartbeat" must be specified as array reference};
93             }
94              
95 1         2 foreach my $val ( @{ $params{heartbeat} } ) {
  1         3  
96 1 50       5 if ( $val =~ /\D/ ) {
97 1         98 croak q{"heartbeat" values must be an integer numbers};
98             }
99             }
100              
101 0         0 $self->{heartbeat} = $params{heartbeat};
102             }
103             else {
104 13         16 $self->{heartbeat} = D_HEARTBEAT;
105             }
106              
107 13 50       26 if ( defined $params{command_headers} ) {
108 0 0       0 unless ( ref( $params{command_headers} ) eq 'HASH' ) {
109 0         0 croak q{"command_headers" must be specified as hash reference};
110             }
111              
112 0         0 my %command_headers;
113 0         0 while ( my ( $cmd_name, $headers ) = each %{ $params{command_headers} } ) {
  0         0  
114 0         0 $command_headers{ uc($cmd_name) } = $headers;
115             }
116 0         0 $self->{command_headers} = \%command_headers;
117             }
118              
119 13         36 $self->connection_timeout( $params{connection_timeout} );
120 11         36 $self->reconnect_interval( $params{reconnect_interval} );
121 9         24 $self->on_error( $params{on_error} );
122              
123 9         20 $self->_reset_internals;
124 9         12 $self->{_input_queue} = [];
125 9         12 $self->{_temp_input_queue} = [];
126 9         10 $self->{_write_queue} = [];
127 9         10 $self->{_temp_write_queue} = [];
128 9         34 $self->{_pending_receipts} = {};
129 9         9 $self->{_subs} = {};
130              
131 9 100       20 unless ( $self->{lazy} ) {
132 5         10 $self->_connect;
133             }
134              
135 9         29 return $self;
136             }
137              
138             sub execute {
139 0     0 1 0 my $self = shift;
140 0         0 my $cmd_name = shift;
141              
142 0         0 my $cmd = $self->_prepare( $cmd_name, [@_] );
143 0         0 $self->_execute($cmd);
144              
145 0         0 return;
146             }
147              
148             # Generate methods
149             {
150 4     4   17 no strict qw( refs );
  4         5  
  4         597  
151              
152             foreach my $name ( qw( send subscribe unsubscribe ack nack begin commit
153             abort disconnect ) )
154             {
155             *{$name} = sub {
156 2     2   86 my $self = shift;
157              
158 2         8 my $cmd = $self->_prepare( $name, [@_] );
159 2         5 $self->_execute($cmd);
160              
161 0         0 return;
162             }
163             }
164             }
165              
166             sub on_error {
167 14     14 1 343 my $self = shift;
168              
169 14 100       33 if (@_) {
170 11         9 my $on_error = shift;
171              
172 11 100       23 if ( defined $on_error ) {
173 5         6 $self->{on_error} = $on_error;
174             }
175             else {
176             $self->{on_error} = sub {
177 1     1   1 my $err = shift;
178 1         4 warn $err->message . "\n";
179 6         26 };
180             }
181             }
182              
183 14         21 return $self->{on_error};
184             }
185              
186             # Generate accessors
187             {
188 4     4   17 no strict qw( refs );
  4         3  
  4         12040  
189              
190             foreach my $name ( qw( host port ) ) {
191             *{$name} = sub {
192 2     2   1733 my $self = shift;
193 2         7 return $self->{$name};
194             }
195             }
196              
197             foreach my $name ( qw( connection_timeout reconnect_interval ) ) {
198             *{$name} = sub {
199 38     38   963 my $self = shift;
200              
201 38 100       66 if (@_) {
202 32         34 my $seconds = shift;
203              
204 32 100 100     109 if ( defined $seconds
      66        
205             && ( !looks_like_number($seconds) || $seconds < 0 ) )
206             {
207 8         775 croak qq{"$name" must be a positive number};
208             }
209 24         31 $self->{$name} = $seconds;
210             }
211              
212 30         80 return $self->{$name};
213             };
214             }
215              
216             foreach my $name ( qw( on_connect on_disconnect ) ) {
217             *{$name} = sub {
218 10     10   514 my $self = shift;
219              
220 10 100       19 if (@_) {
221 4         5 $self->{$name} = shift;
222             }
223              
224 10         23 return $self->{$name};
225             };
226             }
227             }
228              
229             sub force_disconnect {
230 0     0 1 0 my $self = shift;
231              
232 0         0 $self->_disconnect();
233              
234 0         0 return;
235             }
236              
237             sub _connect {
238 5     5   6 my $self = shift;
239              
240             $self->{_handle} = AnyEvent::Handle->new(
241 5         23 %{ $self->{handle_params} },
242 5         6 connect => [ $self->{host}, $self->{port} ],
243             on_prepare => $self->_create_on_prepare,
244             on_connect => $self->_create_on_connect,
245             on_connect_error => $self->_create_on_connect_error,
246             on_wtimeout => $self->_create_on_wtimeout,
247             on_rtimeout => $self->_create_on_rtimeout,
248             on_eof => $self->_create_on_eof,
249             on_error => $self->_create_on_handle_error,
250             on_drain => $self->_create_on_drain,
251             on_read => $self->_create_on_read,
252             );
253              
254 5         601 return;
255             }
256              
257             sub _create_on_prepare {
258 5     5   6 my $self = shift;
259              
260 5         15 weaken($self);
261              
262             return sub {
263 5 100   5   29715 if ( defined $self->{connection_timeout} ) {
264 1         3 return $self->{connection_timeout};
265             }
266              
267 4         11 return;
268 5         18 };
269             }
270              
271             sub _create_on_connect {
272 5     5   6 my $self = shift;
273              
274 5         8 weaken($self);
275              
276             return sub {
277 0     0   0 $self->{_connected} = 1;
278 0         0 $self->_login;
279              
280 0 0       0 if ( defined $self->{on_connect} ) {
281 0         0 $self->{on_connect}->();
282             }
283 5         13 };
284             }
285              
286             sub _create_on_connect_error {
287 5     5   5 my $self = shift;
288              
289 5         7 weaken($self);
290              
291             return sub {
292 0     0   0 my $err_msg = pop;
293              
294 0         0 my $err = _new_error(
295             "Can't connect to $self->{host}:$self->{port}: $err_msg",
296             E_CANT_CONN
297             );
298 0         0 $self->_disconnect($err);
299 5         16 };
300             }
301              
302             sub _create_on_wtimeout {
303 5     5   6 my $self = shift;
304              
305 5         7 weaken($self);
306              
307             return sub {
308 0     0   0 $self->{_handle}->push_write(EOL);
309 5         11 };
310             }
311              
312             sub _create_on_rtimeout {
313 5     5   6 my $self = shift;
314              
315 5         9 weaken($self);
316              
317             return sub {
318 0     0   0 my $err = _new_error( 'Read timed out.', E_READ_TIMEDOUT );
319 0         0 $self->_disconnect($err);
320 5         13 };
321             }
322              
323             sub _create_on_eof {
324 5     5   6 my $self = shift;
325              
326 5         10 weaken($self);
327              
328             return sub {
329 0     0   0 my $err = _new_error( 'Connection closed by remote host.',
330             E_CONN_CLOSED_BY_REMOTE_HOST );
331 0         0 $self->_disconnect($err);
332 5         14 };
333             }
334              
335             sub _create_on_handle_error {
336 5     5   6 my $self = shift;
337              
338 5         6 weaken($self);
339              
340             return sub {
341 0     0   0 my $err_msg = pop;
342              
343 0         0 my $err = _new_error( $err_msg, E_IO );
344 0         0 $self->_disconnect($err);
345 5         15 };
346             }
347              
348             sub _create_on_drain {
349 5     5   6 my $self = shift;
350              
351 5         8 weaken($self);
352              
353             return sub {
354 0 0   0   0 return unless @{ $self->{_write_queue} };
  0         0  
355              
356 0         0 $self->{_temp_write_queue} = $self->{_write_queue};
357 0         0 $self->{_write_queue} = [];
358              
359 0         0 while ( my $cmd = shift @{ $self->{_temp_write_queue} } ) {
  0         0  
360 0         0 $cmd->{on_receipt}->();
361             }
362 5         34 };
363             }
364              
365             sub _create_on_read {
366 5     5   5 my $self = shift;
367              
368 5         7 weaken($self);
369              
370 5         4 my $cmd_name;
371             my $headers;
372              
373             return sub {
374 0     0   0 my $handle = shift;
375              
376 0         0 my $frame;
377              
378 0         0 while (1) {
379 0 0       0 return if $handle->destroyed;
380              
381 0 0       0 if ( defined $cmd_name ) {
382 0         0 my $content_length;
383 0 0       0 if ( defined $headers->{'content-length'} ) {
384 0         0 $content_length = $headers->{'content-length'};
385 0 0       0 return if length( $handle->{rbuf} ) < $content_length + 1;
386             }
387             else {
388 0         0 $content_length = index( $handle->{rbuf}, "\0" );
389 0 0       0 return if $content_length < 0
390             }
391              
392 0         0 my $body = substr( $handle->{rbuf}, 0, $content_length, '' );
393 0         0 $handle->{rbuf} =~ s/^\0(?:${\(RE_EOL)})*//;
  0         0  
394              
395 0         0 $frame = _new_frame( $cmd_name, $headers, $body );
396              
397 0         0 undef $cmd_name;
398 0         0 undef $headers;
399             }
400             else {
401 0         0 $handle->{rbuf} =~ s/^(?:${\(RE_EOL)})+//;
  0         0  
402              
403 0 0       0 return unless $handle->{rbuf} =~ s/^(.+?)(?:${\(RE_EOL)}){2}//s;
  0         0  
404              
405 0         0 ( $cmd_name, my @header_strings ) = split( m/${\(RE_EOL)}/, $1 );
  0         0  
406 0         0 foreach my $header_str (@header_strings) {
407 0         0 my ( $name, $value ) = split( /:/, $header_str, 2 );
408 0         0 $headers->{ _unescape($name) } = _unescape($value);
409             }
410              
411 0         0 next;
412             }
413              
414 0         0 $self->_process_frame($frame);
415             }
416 5         39 };
417             }
418              
419             sub _prepare {
420 2     2   2 my $self = shift;
421 2         4 my $cmd_name = uc(shift);
422 2         2 my $args = shift;
423              
424 2         2 my %params;
425              
426 2 50 33     8 if ( ref( $args->[-1] ) eq 'CODE'
427 0         0 && scalar @{$args} % 2 > 0 )
428             {
429 0 0       0 if ( $cmd_name eq 'SUBSCRIBE' ) {
430 0         0 $params{on_message} = pop @{$args};
  0         0  
431             }
432             else {
433 0         0 $params{on_receipt} = pop @{$args};
  0         0  
434             }
435             }
436              
437 2         3 my %headers = @{$args};
  2         5  
438              
439 2         4 foreach my $name ( qw( body on_receipt on_message ) ) {
440 6 50       12 if ( defined $headers{$name} ) {
441 0         0 $params{$name} = delete $headers{$name};
442             }
443             }
444 2 100       7 if ( exists $ACK_CMDS{$cmd_name} ) {
445 1         3 $params{message} = delete $headers{message};
446             }
447              
448             %headers = (
449 2         12 %{ $self->{default_headers} },
450              
451             defined $self->{command_headers}{$cmd_name}
452 2 50       2 ? %{ $self->{command_headers}{$cmd_name} }
  0         0  
453             : (),
454              
455             %headers,
456             );
457              
458 2         8 my $cmd = {
459             name => $cmd_name,
460             headers => \%headers,
461             %params,
462             };
463              
464 2 50       5 unless ( defined $cmd->{on_receipt} ) {
465 2         5 weaken($self);
466              
467             $cmd->{on_receipt} = sub {
468 0     0   0 my $receipt = shift;
469 0         0 my $err = shift;
470              
471 0 0       0 if ( defined $err ) {
472 0         0 $self->{on_error}->($err);
473 0         0 return;
474             }
475 2         7 };
476             }
477              
478 2         5 return $cmd;
479             }
480              
481             sub _execute {
482 2     2   3 my $self = shift;
483 2         1 my $cmd = shift;
484              
485 2 100 66     19 if ( $cmd->{name} eq 'SUBSCRIBE'
    50 33        
486             && !defined $cmd->{on_message} )
487             {
488 1         78 croak '"on_message" callback must be specified';
489             }
490             elsif ( exists $ACK_CMDS{ $cmd->{name} }
491             && !defined $cmd->{message} )
492             {
493 1         116 croak '"message" parameter must be specified';
494             }
495              
496 0 0       0 unless ( $self->{_ready} ) {
497 0 0       0 if ( defined $self->{_handle} ) {
    0          
498 0 0       0 if ( $self->{_connected} ) {
499 0 0       0 if ( $self->{_login_state} == S_NEED_DO ) {
500 0         0 $self->_login;
501             }
502             }
503             }
504             elsif ( $self->{lazy} ) {
505 0         0 undef $self->{lazy};
506 0         0 $self->_connect;
507             }
508             else {
509 0 0 0     0 if ( defined $self->{reconnect_interval}
510             && $self->{reconnect_interval} > 0 )
511             {
512 0 0       0 unless ( defined $self->{_reconnect_timer} ) {
513 0         0 weaken($self);
514              
515             $self->{_reconnect_timer} = AE::timer(
516             $self->{reconnect_interval}, 0,
517             sub {
518 0     0   0 undef $self->{_reconnect_timer};
519 0         0 $self->_connect;
520             }
521 0         0 );
522             }
523             }
524             else {
525 0         0 $self->_connect;
526             }
527             }
528              
529 0         0 push( @{ $self->{_input_queue} }, $cmd );
  0         0  
530              
531 0         0 return;
532             }
533              
534 0         0 $self->_push_write($cmd);
535              
536 0         0 return;
537             }
538              
539             sub _push_write {
540 0     0   0 my $self = shift;
541 0         0 my $cmd = shift;
542              
543 0         0 my $cmd_headers = $cmd->{headers};
544              
545 0 0       0 if ( exists $ACK_CMDS{ $cmd->{name} } ) {
546 0 0       0 unless ( $self->_check_ack( $cmd->{message} ) ) {
547 0         0 my $err = _new_error( "Unexpected $cmd->{name} sent.", E_OPRN_ERROR );
548 0     0   0 AE::postpone { $cmd->{on_receipt}->( undef, $err ) };
  0         0  
549              
550 0         0 return;
551             }
552              
553 0         0 my $msg_headers = $cmd->{message}->headers;
554              
555 0 0       0 if ( $self->{_version} <= 1.1 ) {
556 0         0 $cmd_headers->{'message-id'} = $msg_headers->{'message-id'};
557 0 0       0 if ( $self->{_version} > 1.0 ) {
558 0         0 $cmd_headers->{subscription} = $msg_headers->{subscription};
559             }
560             }
561             else {
562 0         0 $cmd_headers->{id} = $msg_headers->{ack};
563             }
564             }
565              
566 0 0 0     0 if ( exists $NEED_RECEIPT{ $cmd->{name} }
567             || defined $cmd_headers->{receipt} )
568             {
569 0 0       0 if ( $cmd->{name} eq 'CONNECT' ) {
570 0         0 $self->{_pending_receipts}{CONNECTED} = $cmd;
571             }
572             else {
573 0 0 0     0 if ( !defined $cmd_headers->{receipt}
574             || $cmd_headers->{receipt} eq 'auto' )
575             {
576 0         0 $cmd_headers->{receipt} = 'R_' . $self->{_session_id} . '.'
577             . $RECEIPT_SEQ++;
578             }
579 0         0 $self->{_pending_receipts}{ $cmd_headers->{receipt} } = $cmd;
580             }
581             }
582             else {
583 0         0 push( @{ $self->{_write_queue} }, $cmd );
  0         0  
584             }
585              
586 0         0 my $body = $cmd->{body};
587 0 0       0 unless ( defined $body ) {
588 0         0 $body = '';
589             }
590 0 0       0 unless ( defined $cmd_headers->{'content-length'} ) {
591 0         0 $cmd_headers->{'content-length'} = length($body);
592             }
593              
594 0         0 my $frame_str = $cmd->{name} . EOL;
595 0         0 while ( my ( $name, $value ) = each %{$cmd_headers} ) {
  0         0  
596 0 0       0 unless ( defined $value ) {
597 0         0 $value = '';
598             }
599 0         0 $frame_str .= _escape($name) . ':' . _escape($value) . EOL;
600             }
601 0         0 $frame_str .= EOL . "$body\0";
602              
603 0         0 $self->{_handle}->push_write($frame_str);
604              
605 0         0 return;
606             }
607              
608             sub _login {
609 0     0   0 my $self = shift;
610              
611 0         0 my ( $cx, $cy ) = @{ $self->{heartbeat} };
  0         0  
612              
613 0 0       0 if ( $cy > 0 ) {
614 0         0 $self->_rtimeout($cy);
615             }
616              
617 0         0 my %cmd_headers = (
618             'accept-version' => '1.0,1.1,1.2',
619             'heart-beat' => join( ',', $cx, $cy ),
620             );
621 0 0       0 if ( defined $self->{login} ) {
622 0         0 $cmd_headers{login} = $self->{login};
623             }
624 0 0       0 if ( defined $self->{passcode} ) {
625 0         0 $cmd_headers{passcode} = $self->{passcode};
626             }
627 0 0       0 if ( defined $self->{vhost} ) {
628 0         0 $cmd_headers{host} = $self->{vhost};
629             }
630              
631 0         0 weaken($self);
632 0         0 $self->{_login_state} = S_IN_PROGRESS;
633              
634             $self->_push_write(
635             { name => 'CONNECT',
636             headers => \%cmd_headers,
637              
638             on_receipt => sub {
639 0     0   0 my $receipt = shift;
640 0         0 my $err = shift;
641              
642 0 0       0 if ( defined $err ) {
643 0         0 $self->{_login_state} = S_NEED_DO;
644 0         0 $self->_abort($err);
645              
646 0         0 return;
647             }
648              
649 0         0 $self->{_login_state} = S_DONE;
650              
651 0         0 my $receipt_headers = $receipt->headers;
652              
653 0 0       0 if ( defined $receipt_headers->{'heart-beat'} ) {
654 0         0 my ( $sx, $sy ) = split( /,/, $receipt_headers->{'heart-beat'} );
655              
656 0 0       0 if ( $sx > 0 ) {
657 0         0 $self->_rtimeout( max( $cy, $sx ) );
658             }
659 0 0       0 if ( $sy > 0 ) {
660 0         0 $self->_wtimeout( max( $cx, $sy ) );
661             }
662             }
663              
664 0         0 $self->{_ready} = 1;
665             $self->{_version}
666 0   0     0 = version->parse( $receipt_headers->{version} || 1.0 );
667 0   0     0 $self->{_session_id} = $receipt_headers->{session} || '';
668              
669 0         0 $self->_process_input_queue;
670             },
671             }
672 0         0 );
673              
674 0         0 return;
675             }
676              
677             sub _rtimeout {
678 0     0   0 my $self = shift;
679 0         0 my $rtimeout = shift;
680              
681 0         0 $self->{_handle}->rtimeout_reset;
682 0         0 $self->{_handle}->rtimeout( ( $rtimeout / 1000 ) * 3 );
683              
684 0         0 return;
685             }
686              
687             sub _wtimeout {
688 0     0   0 my $self = shift;
689 0         0 my $wtimeout = shift;
690              
691 0         0 $self->{_handle}->wtimeout_reset;
692 0         0 $self->{_handle}->wtimeout( $wtimeout / 1000 );
693              
694 0         0 return;
695             }
696              
697             sub _process_input_queue {
698 0     0   0 my $self = shift;
699              
700 0         0 $self->{_temp_input_queue} = $self->{_input_queue};
701 0         0 $self->{_input_queue} = [];
702              
703 0         0 while ( my $cmd = shift @{ $self->{_temp_input_queue} } ) {
  0         0  
704 0         0 $self->_push_write($cmd);
705             }
706              
707 0         0 return;
708             }
709              
710             sub _check_ack {
711 0     0   0 my $self = shift;
712 0         0 my $msg = shift;
713              
714 0         0 my $msg_headers = $msg->headers;
715 0   0     0 my $sub_id = $msg_headers->{subscription} || $msg_headers->{destination};
716 0         0 my $sub = $self->{_subs}{$sub_id};
717 0         0 my $msg_tag = $msg_headers->{'message-tag'};
718              
719 0 0       0 if ( defined $sub ) {
720 0 0       0 if ( defined $sub->{pending_acks} ) {
721 0 0       0 if ( ref( $sub->{pending_acks} ) eq 'ARRAY' ) {
722             my $i = bsearch_index {
723 0 0   0   0 $msg_tag > $_ ? -1 : $msg_tag < $_ ? 1 : 0;
    0          
724             }
725 0         0 @{ $sub->{pending_acks} };
  0         0  
726              
727 0 0       0 if ( $i >= 0 ) {
728 0         0 splice( @{ $sub->{pending_acks} }, 0, $i + 1 );
  0         0  
729 0         0 return 1;
730             }
731             }
732             else { # HASH
733 0 0       0 return 1 if delete $sub->{pending_acks}{$msg_tag};
734             }
735             }
736             }
737              
738 0         0 return;
739             }
740              
741             sub _process_frame {
742 0     0   0 my $self = shift;
743 0         0 my $frame = shift;
744              
745 0 0       0 if ( $frame->command eq 'MESSAGE' ) {
    0          
    0          
746 0         0 $self->_process_message($frame);
747             }
748             elsif ( $frame->command eq 'RECEIPT' ) {
749 0         0 $self->_process_receipt($frame);
750             }
751             elsif ( $frame->command eq 'ERROR' ) {
752 0 0       0 if ( defined $self->{_pending_receipts}{CONNECTED} ) {
753 0         0 $frame->headers->{'receipt-id'} = 'CONNECTED';
754             }
755 0         0 $self->_process_error($frame);
756             }
757             else { # CONNECTED
758 0         0 $frame->headers->{'receipt-id'} = 'CONNECTED';
759 0         0 $self->_process_receipt($frame);
760             }
761              
762 0         0 return;
763             }
764              
765             sub _process_message {
766 0     0   0 my $self = shift;
767 0         0 my $msg = shift;
768              
769 0         0 my $msg_headers = $msg->headers;
770 0   0     0 my $sub_id = $msg_headers->{subscription} || $msg_headers->{destination};
771 0         0 my $sub = $self->{_subs}{$sub_id};
772              
773 0 0       0 unless ( defined $sub ) {
774 0         0 my $err = _new_error(
775             qq{Don't know how process MESSAGE frame. Unknown subscription "$sub_id"},
776             E_UNEXPECTED_DATA
777             );
778 0         0 $self->_disconnect($err);
779              
780 0         0 return;
781             }
782              
783 0         0 my $msg_tag = $MESSAGE_SEQ++;
784 0         0 $msg_headers->{'message-tag'} = $msg_tag;
785              
786 0 0       0 if ( defined $sub->{pending_acks} ) {
787 0 0       0 if ( ref( $sub->{pending_acks} ) eq 'ARRAY' ) {
788 0         0 push( @{ $sub->{pending_acks} }, $msg_tag );
  0         0  
789             }
790             else { # HASH
791 0         0 $sub->{pending_acks}{$msg_tag} = 1;
792             }
793             }
794              
795 0         0 $sub->{on_message}->($msg);
796              
797 0         0 return;
798             }
799              
800             sub _process_receipt {
801 0     0   0 my $self = shift;
802 0         0 my $receipt = shift;
803              
804 0         0 my $receipt_id = $receipt->headers->{'receipt-id'};
805 0         0 my $cmd = delete $self->{_pending_receipts}{$receipt_id};
806              
807 0 0       0 unless ( defined $cmd ) {
808 0         0 my $err = _new_error(
809             qq{Unknown RECEIPT frame received: receipt-id=$receipt_id},
810             E_UNEXPECTED_DATA
811             );
812 0         0 $self->_disconnect($err);
813              
814 0         0 return;
815             }
816              
817 0 0       0 if ( exists $SUBUNSUB_CMDS{ $cmd->{name} } ) {
    0          
818 0         0 my $cmd_headers = $cmd->{headers};
819 0   0     0 my $sub_id = $cmd_headers->{id} || $cmd_headers->{destination};
820              
821 0 0       0 if ( $cmd->{name} eq 'SUBSCRIBE' ) {
822 0         0 $self->{_subs}{$sub_id} = $cmd;
823              
824 0 0       0 if ( defined $cmd_headers->{ack} ) {
825 0 0       0 if ( $cmd_headers->{ack} eq 'client' ) {
    0          
826 0         0 $cmd->{pending_acks} = [];
827             }
828             elsif ( $cmd_headers->{ack} eq 'client-individual' ) {
829 0         0 $cmd->{pending_acks} = {};
830             }
831             }
832             }
833             else { # UNSUBSCRIBE
834 0         0 delete $self->{_subs}{$sub_id};
835             }
836             }
837             elsif ( $cmd->{name} eq 'DISCONNECT' ) {
838 0         0 $self->_disconnect;
839             }
840              
841 0         0 $cmd->{on_receipt}->($receipt);
842              
843 0         0 return;
844             }
845              
846             sub _process_error {
847 0     0   0 my $self = shift;
848 0         0 my $err_frame = shift;
849              
850 0         0 my $err_headers = $err_frame->headers;
851 0         0 my $err = _new_error( $err_headers->{message}, E_OPRN_ERROR, $err_frame );
852              
853 0         0 my $cmd;
854 0 0       0 if ( defined $err_headers->{'receipt-id'} ) {
855 0         0 $cmd = delete $self->{_pending_receipts}{ $err_headers->{'receipt-id'} };
856             }
857              
858 0 0       0 if ( defined $cmd ) {
859 0         0 $cmd->{on_receipt}->( undef, $err );
860             }
861             else {
862 0         0 $self->_disconnect($err);
863             }
864              
865 0         0 return;
866             }
867              
868             sub _disconnect {
869 0     0   0 my $self = shift;
870 0         0 my $err = shift;
871              
872 0         0 my $was_connected = $self->{_connected};
873              
874 0 0       0 if ( defined $self->{_handle} ) {
875 0         0 $self->{_handle}->destroy;
876             }
877 0         0 $self->_reset_internals;
878 0         0 $self->_abort($err);
879              
880 0 0 0     0 if ( $was_connected && defined $self->{on_disconnect} ) {
881 0         0 $self->{on_disconnect}->();
882             }
883              
884 0         0 return;
885             }
886              
887             sub _reset_internals {
888 9     9   10 my $self = shift;
889              
890 9         11 $self->{_handle} = undef;
891 9         18 $self->{_connected} = 0;
892 9         14 $self->{_login_state} = S_NEED_DO;
893 9         9 $self->{_ready} = 0;
894 9         12 $self->{_version} = undef;
895 9         9 $self->{_session_id} = undef;
896 9         11 $self->{_reconnect_timer} = undef;
897              
898 9         9 return;
899             }
900              
901             sub _abort {
902 0     0   0 my $self = shift;
903 0         0 my $err = shift;
904              
905 0         0 my @queued_commands = $self->_queued_commands;
906 0         0 my %subs = %{ $self->{_subs} };
  0         0  
907              
908 0         0 $self->{_input_queue} = [];
909 0         0 $self->{_temp_input_queue} = [];
910 0         0 $self->{_write_queue} = [];
911 0         0 $self->{_temp_write_queue} = [];
912 0         0 $self->{_pending_receipts} = {};
913 0         0 $self->{_subs} = {};
914              
915 0 0 0     0 if ( !defined $err && @queued_commands ) {
916 0         0 $err = _new_error( 'Connection closed by client prematurely.',
917             E_CONN_CLOSED_BY_CLIENT );
918             }
919              
920 0 0       0 if ( defined $err ) {
921 0         0 my $err_msg = $err->message;
922 0         0 my $err_code = $err->code;
923 0         0 my $err_frame = $err->frame;
924              
925 0         0 $self->{on_error}->($err);
926              
927 0 0 0     0 if ( %subs && $err_code != E_CONN_CLOSED_BY_CLIENT ) {
928 0         0 foreach my $sub_id ( keys %subs ) {
929 0         0 my $err = _new_error( qq{Subscription "$sub_id" lost: $err_msg},
930             $err_code, $err_frame );
931              
932 0         0 my $sub = $subs{$sub_id};
933 0         0 $sub->{on_receipt}->( undef, $err );
934             }
935             }
936              
937 0         0 foreach my $cmd (@queued_commands) {
938 0         0 my $err = _new_error( qq{Operation "$cmd->{name}" aborted: $err_msg},
939             $err_code, $err_frame );
940 0         0 $cmd->{on_receipt}->( undef, $err );
941             }
942             }
943              
944 0         0 return;
945             }
946              
947             sub _queued_commands {
948 9     9   8 my $self = shift;
949              
950             return (
951 9         19 values %{ $self->{_pending_receipts} },
952 9         12 @{ $self->{_temp_write_queue} },
953 9         11 @{ $self->{_write_queue} },
954 9         8 @{ $self->{_temp_input_queue} },
955 9         9 @{ $self->{_input_queue} },
  9         16  
956             );
957             }
958              
959             sub _escape {
960 0     0   0 my $str = shift;
961              
962 0         0 $str =~ s/([\r\n:\\])/$ESCAPE_MAP{$1}/ge;
  0         0  
963              
964 0         0 return $str;
965             }
966              
967             sub _unescape {
968 0     0   0 my $str = shift;
969              
970 0         0 $str =~ s/(\\[rnc\\])/$UNESCAPE_MAP{$1}/ge;
  0         0  
971              
972 0         0 return $str;
973             }
974              
975             sub _new_frame {
976 0     0   0 return AnyEvent::Stomper::Frame->new(@_);
977             }
978              
979             sub _new_error {
980 0     0   0 return AnyEvent::Stomper::Error->new(@_);
981             }
982              
983             sub DESTROY {
984 15     15   2299 my $self = shift;
985              
986 15 100       36 if ( defined $self->{_handle} ) {
987 5         19 $self->{_handle}->destroy;
988             }
989              
990 15 100       445 if ( defined $self->{_pending_receipts} ) {
991 9         20 my @queued_commands = $self->_queued_commands;
992              
993 9         14 foreach my $cmd (@queued_commands) {
994 0         0 warn qq{Operation "$cmd->{name}" aborted:}
995             . " Client object destroyed prematurely.\n";
996             }
997             }
998              
999 15         182 return;
1000             }
1001              
1002             1;
1003             __END__