File Coverage

blib/lib/Event/RPC/Message.pm
Criterion Covered Total %
statement 78 86 90.7
branch 25 44 56.8
condition 3 8 37.5
subroutine 12 18 66.6
pod 0 15 0.0
total 118 171 69.0


line stmt bran cond sub pod time code
1             #-----------------------------------------------------------------------
2             # Copyright (C) 2005-2015 by Jörn Reder .
3             # All Rights Reserved. See file COPYRIGHT for details.
4             #
5             # This module is part of Event::RPC, which is free software; you can
6             # redistribute it and/or modify it under the same terms as Perl itself.
7             #-----------------------------------------------------------------------
8              
9             package Event::RPC::Message;
10              
11 31     31   179 use Carp;
  31         38  
  31         1491  
12 31     31   152 use strict;
  31         59  
  31         553  
13 31     31   109 use utf8;
  31         37  
  31         110  
14              
15             my $DEBUG = 0;
16             my $MAX_PACKET_SIZE = 2*1024*1024*1024;
17              
18 4211     4211 0 585042 sub get_sock { shift->{sock} }
19              
20 0     0 0 0 sub get_buffer { shift->{buffer} }
21 0     0 0 0 sub get_length { shift->{length} }
22 0     0 0 0 sub get_written { shift->{written} }
23              
24 0     0 0 0 sub set_buffer { shift->{buffer} = $_[1] }
25 0     0 0 0 sub set_length { shift->{length} = $_[1] }
26 0     0 0 0 sub set_written { shift->{written} = $_[1] }
27              
28             sub get_max_packet_size {
29 1     1 0 6 return $MAX_PACKET_SIZE;
30             }
31              
32             sub set_max_packet_size {
33 2     2 0 6 my $class = shift;
34 2         5 my ($value) = @_;
35 2         16 $MAX_PACKET_SIZE = $value;
36             }
37              
38             sub new {
39 856     856 0 1648 my $class = shift;
40 856         1839 my ($sock) = @_;
41              
42 856         5629 my $self = bless {
43             sock => $sock,
44             buffer => undef,
45             length => 0,
46             written => 0,
47             }, $class;
48              
49 856         2311 return $self;
50             }
51              
52             sub read {
53 856     856 0 1394 my $self = shift;
54 856         5542 my ($blocking) = @_;
55              
56 856 100       2091 $self->get_sock->blocking($blocking?1:0);
57            
58 856 50       13504 if ( not defined $self->{buffer} ) {
59 856         1734 my $length_packed;
60 856 50       1749 $DEBUG && print "DEBUG: going to read header...\n";
61 856         1912 my $rc = sysread ($self->get_sock, $length_packed, 4);
62 856 50       44912 $DEBUG && print "DEBUG: header read rc=$rc\n";
63 856 100 66     4624 die "DISCONNECTED" if !(defined $rc) || $rc == 0;
64 833         2807 $self->{length} = unpack("N", $length_packed);
65 833 50       2036 $DEBUG && print "DEBUG: packet size=$self->{length}\n";
66             die "Incoming message size exceeds limit of $MAX_PACKET_SIZE bytes"
67 833 50       2035 if $self->{length} > $MAX_PACKET_SIZE;
68             }
69              
70 833   50     3269 my $buffer_length = length($self->{buffer}||'');
71              
72 833 50       1716 $DEBUG && print "DEBUG: going to read packet... (buffer_length=$buffer_length)\n";
73              
74             my $rc = sysread (
75             $self->get_sock,
76             $self->{buffer},
77 833         2121 $self->{length} - $buffer_length,
78             $buffer_length
79             );
80              
81 833 50       3905 $DEBUG && print "DEBUG: packet read rc=$rc\n";
82              
83 833 50       1958 return if not defined $rc;
84 833 50       1650 die "DISCONNECTED" if $rc == 0;
85              
86 833         1456 $buffer_length = length($self->{buffer});
87              
88             $DEBUG && print "DEBUG: more to read... ($self->{length} != $buffer_length)\n"
89 833 50 0     1702 if $self->{length} != $buffer_length;
90              
91 833 50       1788 return if $self->{length} != $buffer_length;
92              
93 833 50       1645 $DEBUG && print "DEBUG: read finished, length=$buffer_length\n";
94              
95 833         4390 my $data = $self->decode_message($self->{buffer});
96              
97 823         10265 $self->{buffer} = undef;
98 823         1695 $self->{length} = 0;
99              
100 823         3046 return $data;
101             }
102              
103             sub read_blocked {
104 684     684 0 1247 my $self = shift;
105              
106 684         1180 my $rc;
107 684         7023 $rc = $self->read(1) while not defined $rc;
108              
109 677         1965 return $rc;
110             }
111              
112             sub set_data {
113 833     833 0 1244 my $self = shift;
114 833         1433 my ($data) = @_;
115              
116 833 50       1792 $DEBUG && print "DEBUG: Message->set_data($data)\n";
117              
118 833         3510 my $packed = $self->encode_message($data);
119              
120 833 100       15338 if ( length($packed) > $MAX_PACKET_SIZE ) {
121 1         11 Event::RPC::Server->instance->log("ERROR: response packet exceeds limit of $MAX_PACKET_SIZE bytes");
122 1         6 $data = { rc => 0, msg => "Response packed exceeds limit of $MAX_PACKET_SIZE bytes" };
123 1         4 $packed = $self->encode_message($data);
124             }
125              
126 833         5681 $self->{buffer} = pack("N", length($packed)).$packed;
127 833         1738 $self->{length} = length($self->{buffer});
128 833         1445 $self->{written} = 0;
129              
130 833         1367 1;
131             }
132              
133             sub write {
134 833     833 0 1328 my $self = shift;
135 833         1543 my ($blocking) = @_;
136              
137 833 100       3147 $self->get_sock->blocking($blocking?1:0);
138              
139             my $rc = syswrite (
140             $self->get_sock,
141             $self->{buffer},
142             $self->{length}-$self->{written},
143             $self->{written},
144 833         13840 );
145              
146 833 50       8083 $DEBUG && print "DEBUG: written rc=$rc\n";
147              
148 833 50       1958 return if not defined $rc;
149              
150 833         1627 $self->{written} += $rc;
151              
152 833 50       2014 if ( $self->{written} == $self->{length} ) {
153 833 50       1720 $DEBUG && print "DEBUG: write finished\n";
154 833         1409 $self->{buffer} = undef;
155 833         1222 $self->{length} = 0;
156 833         2925 return 1;
157             }
158              
159 0 0       0 $DEBUG && print "DEBUG: more to be written...\n";
160              
161 0         0 return;
162             }
163              
164             sub write_blocked {
165 684     684 0 1637 my $self = shift;
166 684         1200 my ($data) = @_;
167              
168 684         3161 $self->set_data($data);
169              
170 684         988 my $finished = 0;
171 684         3700 $finished = $self->write(1) while not $finished;
172              
173 684         1819 1;
174             }
175              
176             1;
177              
178             __END__