File Coverage

blib/lib/Net/STOMP/Client/IO.pm
Criterion Covered Total %
statement 29 135 21.4
branch 0 70 0.0
condition 0 33 0.0
subroutine 10 16 62.5
pod 4 4 100.0
total 43 258 16.6


line stmt bran cond sub pod time code
1             #+##############################################################################
2             # #
3             # File: Net/STOMP/Client/IO.pm #
4             # #
5             # Description: Input/Output support for Net::STOMP::Client #
6             # #
7             #-##############################################################################
8              
9             #
10             # module definition
11             #
12              
13             package Net::STOMP::Client::IO;
14 1     1   25 use 5.005; # need the four-argument form of substr()
  1         3  
15 1     1   25 use strict;
  1         3  
  1         37  
16 1     1   20 use warnings;
  1         3  
  1         95  
17             our $VERSION = "2.4";
18             our $REVISION = sprintf("%d.%02d", q$Revision: 2.2 $ =~ /(\d+)\.(\d+)/);
19              
20             #
21             # used modules
22             #
23              
24 1     1   8 use List::Util qw(min);
  1         2  
  1         78  
25 1     1   8 use No::Worries::Die qw(dief);
  1         2  
  1         6  
26 1     1   116 use No::Worries::Log qw(log_debug);
  1         3  
  1         6  
27 1     1   129 use POSIX qw(:errno_h);
  1         2  
  1         9  
28 1     1   409 use Time::HiRes qw();
  1         2  
  1         35  
29              
30             #
31             # constants
32             #
33              
34 1     1   6 use constant READ_LENGTH => 32_768; # chunk size for sysread()
  1         2  
  1         72  
35 1     1   7 use constant WRITE_LENGTH => 32_768; # minimum length for syswrite()
  1         3  
  1         1296  
36              
37             #+++############################################################################
38             # #
39             # private helpers #
40             # #
41             #---############################################################################
42              
43             #
44             # attempt to read data from the socket to the buffer
45             #
46             # note: we read at least once even if the buffer contains enough data
47             #
48             # common scenarios:
49             # - timeout=undef minlen=undef: loop until we successfully read once
50             # - timeout=undef minlen=N: loop until we read at least N bytes
51             # - timeout=0 minlen=undef: read only once (successful or not)
52             # - timeout=0 minlen=N: loop until we read >=N bytes or fail once
53             # - timeout=T minlen=undef: loop until timeout
54             # - timeout=T minlen=N: loop until we read >=N bytes or timeout
55             #
56              
57             sub _try_to_read ($$$) { ## no critic 'ProhibitExcessComplexity'
58 0     0     my($self, $timeout, $minlen) = @_;
59 0           my($maxtime, $total, $count, $sleeptime, $remaining);
60              
61 0           $self->{incoming_buflen} = length($self->{incoming_buffer});
62             # boundary conditions
63 0 0         if ($timeout) {
64 0 0         return(0) unless $timeout > 0;
65             # timer starts now
66 0           $maxtime = Time::HiRes::time() + $timeout;
67             }
68             # try to read, in a loop, until we are done
69 0           $total = 0;
70 0           while (1) {
71             # attempt to read once
72             $count = sysread($self->{socket}, $self->{incoming_buffer},
73 0           READ_LENGTH, $self->{incoming_buflen});
74 0 0         if (defined($count)) {
75             # we could read this time
76 0 0         unless ($count) {
77             # ... but we hit the EOF
78 0           $self->{error} = "cannot sysread(): EOF";
79 0           return($total);
80             }
81             # this is a normal successful read
82 0           $self->{incoming_time} = Time::HiRes::time();
83 0           $self->{incoming_buflen} += $count;
84 0           $total += $count;
85             # check if we have worked enough
86 0 0 0       return($total) unless $minlen and $total < $minlen;
87             } else {
88             # we could not read this time
89 0 0 0       if ($! != EAGAIN and $! != EWOULDBLOCK) {
90             # unexpected error
91 0           $self->{error} = "cannot sysread(): $!";
92 0           return(undef);
93             }
94             }
95             # check time
96 0 0         if (not defined($timeout)) {
    0          
97             # timeout = undef => loop forever until we are done
98 0           $sleeptime = 0.01;
99             } elsif ($timeout) {
100             # timeout > 0 => try again only if not too late
101 0           $remaining = $maxtime - Time::HiRes::time();
102 0 0         return($total) unless $remaining > 0;
103 0           $sleeptime = min($remaining, 0.01);
104             } else {
105             # timeout = 0 => try again unless last read failed
106 0 0         return($total) unless $count;
107             }
108             # sleep a bit...
109 0 0         Time::HiRes::sleep($sleeptime) unless $count;
110             }
111             }
112              
113             #
114             # attempt to write data from the queue and buffer to the socket
115             #
116             # common scenarios:
117             # - timeout=undef minlen=undef: loop until we successfully write once
118             # - timeout=undef minlen=N: loop until we write at least N bytes
119             # - timeout=0 minlen=undef: write only once (successful or not)
120             # - timeout=0 minlen=N: loop until we write >=N bytes or fail once
121             # - timeout=T minlen=undef: loop until timeout
122             # - timeout=T minlen=N: loop until we write >=N bytes or timeout
123             #
124              
125             sub _try_to_write ($$$) { ## no critic 'ProhibitExcessComplexity'
126 0     0     my($self, $timeout, $minlen) = @_;
127 0           my($maxtime, $total, $count, $sleeptime, $remaining, $data);
128              
129 0           $self->{outgoing_buflen} = length($self->{outgoing_buffer});
130             # boundary conditions
131 0 0 0       return(0) unless $self->{outgoing_buflen} or @{ $self->{outgoing_queue} };
  0            
132 0 0         if ($timeout) {
133 0 0         return(0) unless $timeout > 0;
134             # timer starts now
135 0           $maxtime = Time::HiRes::time() + $timeout;
136             }
137             # try to write, in a loop, until we are done
138 0           $total = 0;
139 0           while (1) {
140             # make sure there is enough data in the outgoing buffer
141 0   0       while ($self->{outgoing_buflen} < WRITE_LENGTH
142 0           and @{ $self->{outgoing_queue} }) {
143 0           $data = shift(@{ $self->{outgoing_queue} });
  0            
144 0           $self->{outgoing_buffer} .= ${ $data };
  0            
145 0           $self->{outgoing_buflen} += length(${ $data });
  0            
146             }
147 0 0         return($total) unless $self->{outgoing_buflen};
148             # attempt to write once
149             $count = syswrite($self->{socket}, $self->{outgoing_buffer},
150 0           $self->{outgoing_buflen});
151 0 0         if (defined($count)) {
152             # we could write this time
153 0 0         if ($count) {
154             # this is a normal successful write
155 0           $self->{outgoing_time} = Time::HiRes::time();
156 0           $self->{outgoing_buflen} -= $count;
157 0           $total += $count;
158 0           substr($self->{outgoing_buffer}, 0, $count, "");
159 0           $self->{outgoing_length} -= $count;
160             # check if we have worked enough
161             return($total) unless $self->{outgoing_buflen}
162 0 0 0       or @{ $self->{outgoing_queue} };
  0            
163 0 0 0       return($total) unless $minlen and $total < $minlen;
164             }
165             } else {
166             # we could not write this time
167 0 0 0       if ($! != EAGAIN and $! != EWOULDBLOCK) {
168             # unexpected error
169 0           $self->{error} = "cannot syswrite(): $!";
170 0           return(undef);
171             }
172             }
173             # check time
174 0 0         if (not defined($timeout)) {
    0          
175             # timeout = undef => loop forever until we are done
176 0           $sleeptime = 0.01;
177             } elsif ($timeout) {
178             # timeout > 0 => try again only if not too late
179 0           $remaining = $maxtime - Time::HiRes::time();
180 0 0         return($total) unless $remaining > 0;
181 0           $sleeptime = min($remaining, 0.01);
182             } else {
183             # timeout = 0 => try again unless last write failed
184 0 0         return($total) unless $count;
185             }
186             # sleep a bit...
187 0 0         Time::HiRes::sleep($sleeptime) unless $count;
188             }
189             }
190              
191             #+++############################################################################
192             # #
193             # object oriented interface #
194             # #
195             #---############################################################################
196              
197             #
198             # constructor
199             #
200              
201             sub new : method {
202 0     0 1   my($class, $socket) = @_;
203 0           my($self);
204              
205 0 0 0       dief("missing or invalid socket")
      0        
206             unless $socket and ref($socket) and $socket->isa("IO::Socket");
207 0           $socket->blocking(0);
208 0           $self = {};
209 0           $self->{socket} = $socket;
210 0           $self->{incoming_buffer} = "";
211 0           $self->{incoming_buflen} = 0;
212 0           $self->{outgoing_buffer} = "";
213 0           $self->{outgoing_buflen} = 0; # buffer length only
214 0           $self->{outgoing_queue} = [];
215 0           $self->{outgoing_length} = 0; # buffer + queue length
216 0           return(bless($self, $class));
217             }
218              
219             #
220             # queue the given data (a scalar reference!)
221             #
222              
223             sub queue_data : method {
224 0     0 1   my($self, $data) = @_;
225 0           my($length);
226              
227 0 0         dief("unexpected data: %s", $data) unless ref($data) eq "SCALAR";
228 0           $length = length(${ $data });
  0            
229 0 0         if ($length) {
230 0           push(@{ $self->{outgoing_queue} }, $data);
  0            
231 0           $self->{outgoing_length} += $length;
232             }
233 0           return($self->{outgoing_length});
234             }
235              
236             #
237             # send the queued data
238             #
239              
240             sub send_data : method {
241 0     0 1   my($self, %option) = @_;
242 0           my($minlen, $count);
243              
244 0 0         unless ($self->{error}) {
245             # send some data
246 0           $minlen = $self->{outgoing_length};
247 0           $count = _try_to_write($self, $option{timeout}, $minlen);
248             }
249 0 0         dief($self->{error}) unless defined($count);
250             # so far so good
251             log_debug("sent %d bytes", $count)
252 0 0 0       if $option{debug} and $option{debug} =~ /\b(io|all)\b/;
253 0           return($count);
254             }
255              
256             #
257             # receive some data
258             #
259              
260             sub receive_data : method {
261 0     0 1   my($self, %option) = @_;
262 0           my($minlen, $count);
263              
264 0 0         unless ($self->{error}) {
265             # receive some data
266 0 0         $minlen = $option{timeout} ? 1 : undef;
267 0           $count = _try_to_read($self, $option{timeout}, $minlen);
268             }
269 0 0         dief($self->{error}) unless defined($count);
270             # so far so good
271             log_debug("received %d bytes", $count)
272 0 0 0       if $option{debug} and $option{debug} =~ /\b(io|all)\b/;
273 0           return($count);
274             }
275              
276             1;
277              
278             __END__