| line | stmt | bran | cond | sub | pod | time | code | 
| 1 |  |  |  |  |  |  | #----------------------------------------------------------------------- | 
| 2 |  |  |  |  |  |  | # Copyright (C) 2002-2015 by Jörn Reder . | 
| 3 |  |  |  |  |  |  | # All Rights Reserved. See file COPYRIGHT for details. | 
| 4 |  |  |  |  |  |  | # | 
| 5 |  |  |  |  |  |  | # This module is part of Event::RPC, which is free software; you can | 
| 6 |  |  |  |  |  |  | # redistribute it and/or modify it under the same terms as Perl itself. | 
| 7 |  |  |  |  |  |  | #----------------------------------------------------------------------- | 
| 8 |  |  |  |  |  |  |  | 
| 9 |  |  |  |  |  |  | package Event::RPC::Message; | 
| 10 |  |  |  |  |  |  |  | 
| 11 | 24 |  |  | 24 |  | 149 | use Carp; | 
|  | 24 |  |  |  |  | 50 |  | 
|  | 24 |  |  |  |  | 1865 |  | 
| 12 | 24 |  |  | 24 |  | 136 | use strict; | 
|  | 24 |  |  |  |  | 48 |  | 
|  | 24 |  |  |  |  | 574 |  | 
| 13 | 24 |  |  | 24 |  | 119 | use utf8; | 
|  | 24 |  |  |  |  | 44 |  | 
|  | 24 |  |  |  |  | 123 |  | 
| 14 |  |  |  |  |  |  |  | 
| 15 |  |  |  |  |  |  | my %DECODERS = ( | 
| 16 |  |  |  |  |  |  | STOR    => sub { require Storable; Storable::thaw($_[0])                    }, | 
| 17 |  |  |  |  |  |  | JSON    => sub { require JSON::XS; JSON::XS->new->allow_tags->decode($_[0]) }, | 
| 18 |  |  |  |  |  |  | CBOR    => sub { require CBOR::XS; CBOR::XS->new->decode($_[0])             }, | 
| 19 |  |  |  |  |  |  | SERL    => sub { require Sereal;   Sereal::decode_sereal($_[0])             }, | 
| 20 |  |  |  |  |  |  |  | 
| 21 |  |  |  |  |  |  | TEST    => sub { require Storable; Storable::thaw($_[0])                    }, | 
| 22 |  |  |  |  |  |  | ); | 
| 23 |  |  |  |  |  |  |  | 
| 24 |  |  |  |  |  |  | my %ENCODERS = ( | 
| 25 |  |  |  |  |  |  | STOR    => sub { require Storable; Storable::nfreeze ($_[0])                                                    }, | 
| 26 |  |  |  |  |  |  | JSON    => sub { require JSON::XS; '%E:R:JSON%'.JSON::XS->new->latin1->allow_blessed->allow_tags->encode($_[0]) }, | 
| 27 |  |  |  |  |  |  | CBOR    => sub { require CBOR::XS; '%E:R:CBOR%'.CBOR::XS->new->encode($_[0])                                    }, | 
| 28 |  |  |  |  |  |  | SERL    => sub { require Sereal;   '%E:R:SERL%'.Sereal::encode_sereal($_[0])                                    }, | 
| 29 |  |  |  |  |  |  |  | 
| 30 |  |  |  |  |  |  | TEST    => sub { "//NEGOTIATE(A,B,C)//" }, | 
| 31 |  |  |  |  |  |  | ); | 
| 32 |  |  |  |  |  |  |  | 
| 33 |  |  |  |  |  |  | my $DEBUG = 0; | 
| 34 |  |  |  |  |  |  | my $MAX_PACKET_SIZE = 2*1024*1024*1024; | 
| 35 |  |  |  |  |  |  |  | 
| 36 | 3951 |  |  | 3951 | 0 | 641936 | sub get_sock                    { shift->{sock}                         } | 
| 37 |  |  |  |  |  |  |  | 
| 38 | 0 |  |  | 0 | 0 | 0 | sub get_buffer                  { shift->{buffer}                       } | 
| 39 | 0 |  |  | 0 | 0 | 0 | sub get_length                  { shift->{length}                       } | 
| 40 | 0 |  |  | 0 | 0 | 0 | sub get_written                 { shift->{written}                      } | 
| 41 |  |  |  |  |  |  |  | 
| 42 | 0 |  |  | 0 | 0 | 0 | sub set_buffer                  { shift->{buffer}               = $_[1] } | 
| 43 | 0 |  |  | 0 | 0 | 0 | sub set_length                  { shift->{length}               = $_[1] } | 
| 44 | 0 |  |  | 0 | 0 | 0 | sub set_written                 { shift->{written}              = $_[1] } | 
| 45 |  |  |  |  |  |  |  | 
| 46 |  |  |  |  |  |  | sub get_max_packet_size { | 
| 47 | 1 |  |  | 1 | 0 | 4 | return $MAX_PACKET_SIZE; | 
| 48 |  |  |  |  |  |  | } | 
| 49 |  |  |  |  |  |  |  | 
| 50 |  |  |  |  |  |  | sub set_max_packet_size { | 
| 51 | 2 |  |  | 2 | 0 | 5 | my $class = shift; | 
| 52 | 2 |  |  |  |  | 5 | my ($value) = @_; | 
| 53 | 2 |  |  |  |  | 8 | $MAX_PACKET_SIZE = $value; | 
| 54 |  |  |  |  |  |  | } | 
| 55 |  |  |  |  |  |  |  | 
| 56 |  |  |  |  |  |  | sub new { | 
| 57 | 801 |  |  | 801 | 0 | 1723 | my $class = shift; | 
| 58 | 801 |  |  |  |  | 1619 | my ($sock) = @_; | 
| 59 |  |  |  |  |  |  |  | 
| 60 | 801 |  |  |  |  | 5966 | my $self = bless { | 
| 61 |  |  |  |  |  |  | sock    => $sock, | 
| 62 |  |  |  |  |  |  | buffer  => undef, | 
| 63 |  |  |  |  |  |  | length  => 0, | 
| 64 |  |  |  |  |  |  | written => 0, | 
| 65 |  |  |  |  |  |  | }, $class; | 
| 66 |  |  |  |  |  |  |  | 
| 67 | 801 |  |  |  |  | 2856 | return $self; | 
| 68 |  |  |  |  |  |  | } | 
| 69 |  |  |  |  |  |  |  | 
| 70 |  |  |  |  |  |  | sub read { | 
| 71 | 801 |  |  | 801 | 0 | 1260 | my $self = shift; | 
| 72 | 801 |  |  |  |  | 1239 | my ($blocking) = @_; | 
| 73 |  |  |  |  |  |  |  | 
| 74 | 801 | 100 |  |  |  | 2457 | $self->get_sock->blocking($blocking?1:0); | 
| 75 |  |  |  |  |  |  |  | 
| 76 | 801 | 50 |  |  |  | 8505 | if ( not defined $self->{buffer} ) { | 
| 77 | 801 |  |  |  |  | 1169 | my $length_packed; | 
| 78 | 801 | 50 |  |  |  | 1868 | $DEBUG && print "DEBUG: going to read header...\n"; | 
| 79 | 801 |  |  |  |  | 2019 | my $rc = sysread ($self->get_sock, $length_packed, 4); | 
| 80 | 801 | 50 |  |  |  | 3217 | $DEBUG && print "DEBUG: header read rc=$rc\n"; | 
| 81 | 801 | 100 | 66 |  |  | 6733 | die "DISCONNECTED" if !(defined $rc) || $rc == 0; | 
| 82 | 783 |  |  |  |  | 3705 | $self->{length} = unpack("N", $length_packed); | 
| 83 | 783 | 50 |  |  |  | 2142 | $DEBUG && print "DEBUG: packet size=$self->{length}\n"; | 
| 84 |  |  |  |  |  |  | die "Incoming message size exceeds limit of $MAX_PACKET_SIZE bytes" | 
| 85 | 783 | 50 |  |  |  | 2841 | if $self->{length} > $MAX_PACKET_SIZE; | 
| 86 |  |  |  |  |  |  | } | 
| 87 |  |  |  |  |  |  |  | 
| 88 | 783 |  | 50 |  |  | 4328 | my $buffer_length = length($self->{buffer}||''); | 
| 89 |  |  |  |  |  |  |  | 
| 90 | 783 | 50 |  |  |  | 1883 | $DEBUG && print "DEBUG: going to read packet... (buffer_length=$buffer_length)\n"; | 
| 91 |  |  |  |  |  |  |  | 
| 92 |  |  |  |  |  |  | my $rc = sysread ( | 
| 93 |  |  |  |  |  |  | $self->get_sock, | 
| 94 |  |  |  |  |  |  | $self->{buffer}, | 
| 95 | 783 |  |  |  |  | 2767 | $self->{length} - $buffer_length, | 
| 96 |  |  |  |  |  |  | $buffer_length | 
| 97 |  |  |  |  |  |  | ); | 
| 98 |  |  |  |  |  |  |  | 
| 99 | 783 | 50 |  |  |  | 2098 | $DEBUG && print "DEBUG: packet read rc=$rc\n"; | 
| 100 |  |  |  |  |  |  |  | 
| 101 | 783 | 50 |  |  |  | 1967 | return if not defined $rc; | 
| 102 | 783 | 50 |  |  |  | 1691 | die "DISCONNECTED" if $rc == 0; | 
| 103 |  |  |  |  |  |  |  | 
| 104 | 783 |  |  |  |  | 1676 | $buffer_length = length($self->{buffer}); | 
| 105 |  |  |  |  |  |  |  | 
| 106 |  |  |  |  |  |  | $DEBUG && print "DEBUG: more to read... ($self->{length} != $buffer_length)\n" | 
| 107 | 783 | 50 | 0 |  |  | 2021 | if $self->{length} != $buffer_length; | 
| 108 |  |  |  |  |  |  |  | 
| 109 | 783 | 50 |  |  |  | 1884 | return if $self->{length} != $buffer_length; | 
| 110 |  |  |  |  |  |  |  | 
| 111 | 783 | 50 |  |  |  | 1699 | $DEBUG && print "DEBUG: read finished, length=$buffer_length\n"; | 
| 112 |  |  |  |  |  |  |  | 
| 113 | 783 |  |  |  |  | 4415 | my $data = $self->decode_message($self->{buffer}); | 
| 114 |  |  |  |  |  |  |  | 
| 115 | 773 |  |  |  |  | 14308 | $self->{buffer} = undef; | 
| 116 | 773 |  |  |  |  | 1299 | $self->{length} = 0; | 
| 117 |  |  |  |  |  |  |  | 
| 118 | 773 |  |  |  |  | 3188 | return $data; | 
| 119 |  |  |  |  |  |  | } | 
| 120 |  |  |  |  |  |  |  | 
| 121 |  |  |  |  |  |  | sub read_blocked { | 
| 122 | 659 |  |  | 659 | 0 | 1013 | my $self = shift; | 
| 123 |  |  |  |  |  |  |  | 
| 124 | 659 |  |  |  |  | 764 | my $rc; | 
| 125 | 659 |  |  |  |  | 2625 | $rc = $self->read(1) while not defined $rc; | 
| 126 |  |  |  |  |  |  |  | 
| 127 | 652 |  |  |  |  | 2958 | return $rc; | 
| 128 |  |  |  |  |  |  | } | 
| 129 |  |  |  |  |  |  |  | 
| 130 |  |  |  |  |  |  | sub set_data { | 
| 131 | 783 |  |  | 783 | 0 | 1291 | my $self = shift; | 
| 132 | 783 |  |  |  |  | 1183 | my ($data) = @_; | 
| 133 |  |  |  |  |  |  |  | 
| 134 | 783 | 50 |  |  |  | 2109 | $DEBUG && print "DEBUG: Message->set_data($data)\n"; | 
| 135 |  |  |  |  |  |  |  | 
| 136 | 783 |  |  |  |  | 4035 | my $packed = $self->encode_message($data); | 
| 137 |  |  |  |  |  |  |  | 
| 138 | 783 | 100 |  |  |  | 17650 | if ( length($packed) > $MAX_PACKET_SIZE ) { | 
| 139 | 1 |  |  |  |  | 27 | Event::RPC::Server->instance->log("ERROR: response packet exceeds limit of $MAX_PACKET_SIZE bytes"); | 
| 140 | 1 |  |  |  |  | 8 | $data = { rc => 0, msg => "Response packed exceeds limit of $MAX_PACKET_SIZE bytes" }; | 
| 141 | 1 |  |  |  |  | 9 | $packed = $self->encode_message($data); | 
| 142 |  |  |  |  |  |  | } | 
| 143 |  |  |  |  |  |  |  | 
| 144 | 783 |  |  |  |  | 5560 | $self->{buffer} = pack("N", length($packed)).$packed; | 
| 145 | 783 |  |  |  |  | 1645 | $self->{length} = length($self->{buffer}); | 
| 146 | 783 |  |  |  |  | 1084 | $self->{written} = 0; | 
| 147 |  |  |  |  |  |  |  | 
| 148 | 783 |  |  |  |  | 1585 | 1; | 
| 149 |  |  |  |  |  |  | } | 
| 150 |  |  |  |  |  |  |  | 
| 151 |  |  |  |  |  |  | sub write { | 
| 152 | 783 |  |  | 783 | 0 | 1235 | my $self = shift; | 
| 153 | 783 |  |  |  |  | 1511 | my ($blocking) = @_; | 
| 154 |  |  |  |  |  |  |  | 
| 155 | 783 | 100 |  |  |  | 4655 | $self->get_sock->blocking($blocking?1:0); | 
| 156 |  |  |  |  |  |  |  | 
| 157 |  |  |  |  |  |  | my $rc = syswrite ( | 
| 158 |  |  |  |  |  |  | $self->get_sock, | 
| 159 |  |  |  |  |  |  | $self->{buffer}, | 
| 160 |  |  |  |  |  |  | $self->{length}-$self->{written}, | 
| 161 |  |  |  |  |  |  | $self->{written}, | 
| 162 | 783 |  |  |  |  | 10030 | ); | 
| 163 |  |  |  |  |  |  |  | 
| 164 | 783 | 50 |  |  |  | 2477 | $DEBUG && print "DEBUG: written rc=$rc\n"; | 
| 165 |  |  |  |  |  |  |  | 
| 166 | 783 | 50 |  |  |  | 2208 | return if not defined $rc; | 
| 167 |  |  |  |  |  |  |  | 
| 168 | 783 |  |  |  |  | 1877 | $self->{written} += $rc; | 
| 169 |  |  |  |  |  |  |  | 
| 170 | 783 | 50 |  |  |  | 2285 | if ( $self->{written} == $self->{length} ) { | 
| 171 | 783 | 50 |  |  |  | 1905 | $DEBUG && print "DEBUG: write finished\n"; | 
| 172 | 783 |  |  |  |  | 1437 | $self->{buffer} = undef; | 
| 173 | 783 |  |  |  |  | 1944 | $self->{length} = 0; | 
| 174 | 783 |  |  |  |  | 3291 | return 1; | 
| 175 |  |  |  |  |  |  | } | 
| 176 |  |  |  |  |  |  |  | 
| 177 | 0 | 0 |  |  |  | 0 | $DEBUG && print "DEBUG: more to be written...\n"; | 
| 178 |  |  |  |  |  |  |  | 
| 179 | 0 |  |  |  |  | 0 | return; | 
| 180 |  |  |  |  |  |  | } | 
| 181 |  |  |  |  |  |  |  | 
| 182 |  |  |  |  |  |  | sub write_blocked { | 
| 183 | 659 |  |  | 659 | 0 | 1109 | my $self = shift; | 
| 184 | 659 |  |  |  |  | 953 | my ($data) = @_; | 
| 185 |  |  |  |  |  |  |  | 
| 186 | 659 |  |  |  |  | 4176 | $self->set_data($data); | 
| 187 |  |  |  |  |  |  |  | 
| 188 | 659 |  |  |  |  | 963 | my $finished = 0; | 
| 189 | 659 |  |  |  |  | 3297 | $finished = $self->write(1) while not $finished; | 
| 190 |  |  |  |  |  |  |  | 
| 191 | 659 |  |  |  |  | 1918 | 1; | 
| 192 |  |  |  |  |  |  | } | 
| 193 |  |  |  |  |  |  |  | 
| 194 |  |  |  |  |  |  | 1; | 
| 195 |  |  |  |  |  |  |  | 
| 196 |  |  |  |  |  |  | __END__ |