File Coverage

blib/lib/Thrift/MultiplexedProcessor.pm
Criterion Covered Total %
statement 69 77 89.6
branch 3 8 37.5
condition 1 3 33.3
subroutine 16 17 94.1
pod 0 4 0.0
total 89 109 81.6


line stmt bran cond sub pod time code
1             #
2             # Licensed to the Apache Software Foundation (ASF) under one
3             # or more contributor license agreements. See the NOTICE file
4             # distributed with this work for additional information
5             # regarding copyright ownership. The ASF licenses this file
6             # to you under the Apache License, Version 2.0 (the
7             # "License"); you may not use this file except in compliance
8             # with the License. You may obtain a copy of the License at
9             #
10             # http://www.apache.org/licenses/LICENSE-2.0
11             #
12             # Unless required by applicable law or agreed to in writing,
13             # software distributed under the License is distributed on an
14             # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15             # KIND, either express or implied. See the License for the
16             # specific language governing permissions and limitations
17             # under the License.
18             #
19              
20 1     1   462 use 5.10.0;
  1         4  
21 1     1   7 use strict;
  1         1  
  1         36  
22 1     1   6 use warnings;
  1         3  
  1         24  
23              
24 1     1   5 use Thrift;
  1         2  
  1         30  
25 1     1   7 use Thrift::MessageType;
  1         2  
  1         19  
26 1     1   441 use Thrift::MultiplexedProtocol;
  1         4  
  1         29  
27 1     1   6 use Thrift::Protocol;
  1         2  
  1         16  
28 1     1   5 use Thrift::ProtocolDecorator;
  1         2  
  1         29  
29              
30             package Thrift::StoredMessageProtocol;
31 1     1   5 use base qw(Thrift::ProtocolDecorator);
  1         3  
  1         86  
32 1     1   7 use version 0.77; our $VERSION = version->declare("$Thrift::VERSION");
  1         15  
  1         5  
33              
34             sub new {
35 11     11   33 my $classname = shift;
36 11         15 my $protocol = shift;
37 11         17 my $fname = shift;
38 11         16 my $mtype = shift;
39 11         15 my $rseqid = shift;
40 11         32 my $self = $classname->SUPER::new($protocol);
41              
42 11         22 $self->{fname} = $fname;
43 11         19 $self->{mtype} = $mtype;
44 11         17 $self->{rseqid} = $rseqid;
45              
46 11         36 return bless($self,$classname);
47             }
48              
49             sub readMessageBegin
50             {
51 11     11   18 my $self = shift;
52 11         11 my $name = shift;
53 11         17 my $type = shift;
54 11         15 my $seqid = shift;
55              
56 11         15 $$name = $self->{fname};
57 11         20 $$type = $self->{mtype};
58 11         20 $$seqid = $self->{rseqid};
59             }
60              
61             package Thrift::MultiplexedProcessor;
62 1     1   286 use version 0.77; our $VERSION = version->declare("$Thrift::VERSION");
  1         23  
  1         6  
63              
64             sub new {
65 1     1 0 7 my $classname = shift;
66 1         2 my $self = {};
67              
68 1         4 $self->{serviceProcessorMap} = {};
69 1         3 $self->{defaultProcessor} = undef;
70              
71 1         2 return bless($self,$classname);
72             }
73              
74             sub defaultProcessor {
75 0     0 0 0 my $self = shift;
76 0         0 my $processor = shift;
77              
78 0         0 $self->{defaultProcessor} = $processor;
79             }
80              
81             sub registerProcessor {
82 2     2 0 9 my $self = shift;
83 2         2 my $serviceName = shift;
84 2         4 my $processor = shift;
85              
86 2         9 $self->{serviceProcessorMap}->{$serviceName} = $processor;
87             }
88              
89             sub process {
90 11     11 0 451 my $self = shift;
91 11         17 my $input = shift;
92 11         13 my $output = shift;
93              
94             #
95             # Use the actual underlying protocol (e.g. BinaryProtocol) to read the
96             # message header. This pulls the message "off the wire", which we'll
97             # deal with at the end of this method.
98             #
99              
100 11         17 my ($fname, $mtype, $rseqid);
101 11         53 $input->readMessageBegin(\$fname, \$mtype, \$rseqid);
102              
103 11 50 33     29 if ($mtype ne Thrift::TMessageType::CALL && $mtype ne Thrift::TMessageType::ONEWAY) {
104 0         0 die Thrift::TException->new('This should not have happened!?');
105             }
106              
107             # Extract the service name and the new Message name.
108 11 50       32 if (index($fname, Thrift::MultiplexedProtocol::SEPARATOR) == -1) {
109 0 0       0 if (defined $self->{defaultProcessor}) {
110             return $self->{defaultProcessor}->process(
111 0         0 Thrift::StoredMessageProtocol->new($input, $fname, $mtype, $rseqid), $output
112             );
113             } else {
114 0         0 die Thrift::TException->new("Service name not found in message name: {$fname} and no default processor defined. Did you " .
115             'forget to use a MultiplexProtocol in your client?');
116             }
117             }
118              
119 11         35 (my $serviceName, my $messageName) = split(':', $fname, 2);
120              
121 11 50       31 if (!exists($self->{serviceProcessorMap}->{$serviceName})) {
122 0         0 die Thrift::TException->new("Service name not found: {$serviceName}. Did you forget " .
123             'to call registerProcessor()?');
124             }
125              
126             # Dispatch processing to the stored processor
127 11         18 my $processor = $self->{serviceProcessorMap}->{$serviceName};
128 11         31 return $processor->process(
129             Thrift::StoredMessageProtocol->new($input, $messageName, $mtype, $rseqid), $output
130             );
131             }
132              
133             1;