File Coverage

blib/lib/Event/RPC/Message.pm
Criterion Covered Total %
statement 78 86 90.7
branch 25 44 56.8
condition 3 8 37.5
subroutine 12 18 66.6
pod 0 15 0.0
total 118 171 69.0


line stmt bran cond sub pod time code
1             #-----------------------------------------------------------------------
2             # Copyright (C) 2002-2015 by Jörn Reder .
3             # All Rights Reserved. See file COPYRIGHT for details.
4             #
5             # This module is part of Event::RPC, which is free software; you can
6             # redistribute it and/or modify it under the same terms as Perl itself.
7             #-----------------------------------------------------------------------
8              
9             package Event::RPC::Message;
10              
11 24     24   149 use Carp;
  24         50  
  24         1865  
12 24     24   136 use strict;
  24         48  
  24         574  
13 24     24   119 use utf8;
  24         44  
  24         123  
14              
15             my %DECODERS = (
16             STOR => sub { require Storable; Storable::thaw($_[0]) },
17             JSON => sub { require JSON::XS; JSON::XS->new->allow_tags->decode($_[0]) },
18             CBOR => sub { require CBOR::XS; CBOR::XS->new->decode($_[0]) },
19             SERL => sub { require Sereal; Sereal::decode_sereal($_[0]) },
20              
21             TEST => sub { require Storable; Storable::thaw($_[0]) },
22             );
23              
24             my %ENCODERS = (
25             STOR => sub { require Storable; Storable::nfreeze ($_[0]) },
26             JSON => sub { require JSON::XS; '%E:R:JSON%'.JSON::XS->new->latin1->allow_blessed->allow_tags->encode($_[0]) },
27             CBOR => sub { require CBOR::XS; '%E:R:CBOR%'.CBOR::XS->new->encode($_[0]) },
28             SERL => sub { require Sereal; '%E:R:SERL%'.Sereal::encode_sereal($_[0]) },
29              
30             TEST => sub { "//NEGOTIATE(A,B,C)//" },
31             );
32              
33             my $DEBUG = 0;
34             my $MAX_PACKET_SIZE = 2*1024*1024*1024;
35              
36 3951     3951 0 641936 sub get_sock { shift->{sock} }
37              
38 0     0 0 0 sub get_buffer { shift->{buffer} }
39 0     0 0 0 sub get_length { shift->{length} }
40 0     0 0 0 sub get_written { shift->{written} }
41              
42 0     0 0 0 sub set_buffer { shift->{buffer} = $_[1] }
43 0     0 0 0 sub set_length { shift->{length} = $_[1] }
44 0     0 0 0 sub set_written { shift->{written} = $_[1] }
45              
46             sub get_max_packet_size {
47 1     1 0 4 return $MAX_PACKET_SIZE;
48             }
49              
50             sub set_max_packet_size {
51 2     2 0 5 my $class = shift;
52 2         5 my ($value) = @_;
53 2         8 $MAX_PACKET_SIZE = $value;
54             }
55              
56             sub new {
57 801     801 0 1723 my $class = shift;
58 801         1619 my ($sock) = @_;
59              
60 801         5966 my $self = bless {
61             sock => $sock,
62             buffer => undef,
63             length => 0,
64             written => 0,
65             }, $class;
66              
67 801         2856 return $self;
68             }
69              
70             sub read {
71 801     801 0 1260 my $self = shift;
72 801         1239 my ($blocking) = @_;
73              
74 801 100       2457 $self->get_sock->blocking($blocking?1:0);
75            
76 801 50       8505 if ( not defined $self->{buffer} ) {
77 801         1169 my $length_packed;
78 801 50       1868 $DEBUG && print "DEBUG: going to read header...\n";
79 801         2019 my $rc = sysread ($self->get_sock, $length_packed, 4);
80 801 50       3217 $DEBUG && print "DEBUG: header read rc=$rc\n";
81 801 100 66     6733 die "DISCONNECTED" if !(defined $rc) || $rc == 0;
82 783         3705 $self->{length} = unpack("N", $length_packed);
83 783 50       2142 $DEBUG && print "DEBUG: packet size=$self->{length}\n";
84             die "Incoming message size exceeds limit of $MAX_PACKET_SIZE bytes"
85 783 50       2841 if $self->{length} > $MAX_PACKET_SIZE;
86             }
87              
88 783   50     4328 my $buffer_length = length($self->{buffer}||'');
89              
90 783 50       1883 $DEBUG && print "DEBUG: going to read packet... (buffer_length=$buffer_length)\n";
91              
92             my $rc = sysread (
93             $self->get_sock,
94             $self->{buffer},
95 783         2767 $self->{length} - $buffer_length,
96             $buffer_length
97             );
98              
99 783 50       2098 $DEBUG && print "DEBUG: packet read rc=$rc\n";
100              
101 783 50       1967 return if not defined $rc;
102 783 50       1691 die "DISCONNECTED" if $rc == 0;
103              
104 783         1676 $buffer_length = length($self->{buffer});
105              
106             $DEBUG && print "DEBUG: more to read... ($self->{length} != $buffer_length)\n"
107 783 50 0     2021 if $self->{length} != $buffer_length;
108              
109 783 50       1884 return if $self->{length} != $buffer_length;
110              
111 783 50       1699 $DEBUG && print "DEBUG: read finished, length=$buffer_length\n";
112              
113 783         4415 my $data = $self->decode_message($self->{buffer});
114              
115 773         14308 $self->{buffer} = undef;
116 773         1299 $self->{length} = 0;
117              
118 773         3188 return $data;
119             }
120              
121             sub read_blocked {
122 659     659 0 1013 my $self = shift;
123              
124 659         764 my $rc;
125 659         2625 $rc = $self->read(1) while not defined $rc;
126              
127 652         2958 return $rc;
128             }
129              
130             sub set_data {
131 783     783 0 1291 my $self = shift;
132 783         1183 my ($data) = @_;
133              
134 783 50       2109 $DEBUG && print "DEBUG: Message->set_data($data)\n";
135              
136 783         4035 my $packed = $self->encode_message($data);
137              
138 783 100       17650 if ( length($packed) > $MAX_PACKET_SIZE ) {
139 1         27 Event::RPC::Server->instance->log("ERROR: response packet exceeds limit of $MAX_PACKET_SIZE bytes");
140 1         8 $data = { rc => 0, msg => "Response packed exceeds limit of $MAX_PACKET_SIZE bytes" };
141 1         9 $packed = $self->encode_message($data);
142             }
143              
144 783         5560 $self->{buffer} = pack("N", length($packed)).$packed;
145 783         1645 $self->{length} = length($self->{buffer});
146 783         1084 $self->{written} = 0;
147              
148 783         1585 1;
149             }
150              
151             sub write {
152 783     783 0 1235 my $self = shift;
153 783         1511 my ($blocking) = @_;
154              
155 783 100       4655 $self->get_sock->blocking($blocking?1:0);
156              
157             my $rc = syswrite (
158             $self->get_sock,
159             $self->{buffer},
160             $self->{length}-$self->{written},
161             $self->{written},
162 783         10030 );
163              
164 783 50       2477 $DEBUG && print "DEBUG: written rc=$rc\n";
165              
166 783 50       2208 return if not defined $rc;
167              
168 783         1877 $self->{written} += $rc;
169              
170 783 50       2285 if ( $self->{written} == $self->{length} ) {
171 783 50       1905 $DEBUG && print "DEBUG: write finished\n";
172 783         1437 $self->{buffer} = undef;
173 783         1944 $self->{length} = 0;
174 783         3291 return 1;
175             }
176              
177 0 0       0 $DEBUG && print "DEBUG: more to be written...\n";
178              
179 0         0 return;
180             }
181              
182             sub write_blocked {
183 659     659 0 1109 my $self = shift;
184 659         953 my ($data) = @_;
185              
186 659         4176 $self->set_data($data);
187              
188 659         963 my $finished = 0;
189 659         3297 $finished = $self->write(1) while not $finished;
190              
191 659         1918 1;
192             }
193              
194             1;
195              
196             __END__