File Coverage

blib/lib/AnyEvent/OWNet.pm
Criterion Covered Total %
statement 195 216 90.2
branch 53 64 82.8
condition 5 10 50.0
subroutine 34 39 87.1
pod 15 15 100.0
total 302 344 87.7


line stmt bran cond sub pod time code
1 3     3   41659 use strict;
  3         6  
  3         114  
2 3     3   14 use warnings;
  3         5  
  3         170  
3             package AnyEvent::OWNet;
4             $AnyEvent::OWNet::VERSION = '1.163170';
5             # ABSTRACT: Client for 1-wire File System server
6              
7              
8 3     3   64 use 5.008;
  3         10  
9 3     3   17 use constant DEBUG => $ENV{ANYEVENT_OWNET_DEBUG};
  3         6  
  3         266  
10 3     3   1157 use AnyEvent;
  3         5025  
  3         103  
11 3     3   989 use AnyEvent::Handle;
  3         20944  
  3         103  
12 3     3   811 use AnyEvent::Socket;
  3         13232  
  3         402  
13 3     3   22 use Carp qw/croak/;
  3         5  
  3         166  
14 3     3   609 use Sub::Name;
  3         552  
  3         183  
15 3     3   1927 use Try::Tiny;
  3         3453  
  3         186  
16              
17 3     3   1491 use AnyEvent::OWNet::Constants;
  3         7  
  3         12  
18              
19 3     3   1650 use AnyEvent::OWNet::Response;
  3         7  
  3         7089  
20              
21              
22             sub new {
23 6     6 1 3119 my ($pkg, %p) = @_;
24 6         55 bless
25             {
26             connect_queue => [],
27             host => '127.0.0.1',
28             port => 4304,
29             timeout => 5,
30             %p,
31             }, $pkg;
32             }
33              
34             sub _msg {
35 23     23   771 my ($self, $req) = @_;
36 23 100       47 my $version = exists $req->{version} ? $req->{version} : 0;
37 23 100       50 my $data = exists $req->{data} ? $req->{data} : '';
38 23         28 my $payload = length $data;
39             my $type =
40 23 100       45 exists $req->{type} ? $req->{type} : OWNET_MSG_READ; # default to read
41 23 100       39 my $sg = exists $req->{sg} ? $req->{sg} : OWNET_DEFAULT_FLAGS;
42 23 100       37 my $size = exists $req->{size} ? $req->{size} : OWNET_DEFAULT_DATA_SIZE;
43 23 100       32 my $offset = exists $req->{offset} ? $req->{offset} : 0;
44 23         119 return pack 'N6a*', $version, $payload, $type, $sg, $size, $offset, $data;
45             }
46              
47              
48             sub read {
49 3     3 1 489 my ($self, $path, $sub) = @_;
50 3         20 $self->_run_cmd({ data => $path.chr(0), type => OWNET_MSG_READ }, $sub);
51             }
52              
53              
54             sub write {
55 2     2 1 196 my ($self, $path, $value, $sub) = @_;
56 2         13 $self->_run_cmd({ data => $path.chr(0).$value,
57             size => length $value,
58             type => OWNET_MSG_WRITE }, $sub);
59             }
60              
61              
62             sub dir {
63 6     6 1 526 my ($self, $path, $sub) = @_;
64 6         37 $self->_run_cmd({ data => $path."\0", type => OWNET_MSG_DIR, size => 0 },
65             $sub);
66             }
67              
68              
69             sub present {
70 2     2 1 750 my ($self, $path, $sub) = @_;
71 2         10 $self->_run_cmd({ data => $path."\0", type => OWNET_MSG_PRESENT }, $sub);
72             }
73              
74              
75             sub dirall {
76 1     1 1 209 my ($self, $path, $sub) = @_;
77 1         5 $self->_run_cmd({ data => $path."\0", type => OWNET_MSG_DIRALL }, $sub);
78             }
79              
80              
81             sub get {
82 2     2 1 639 my ($self, $path, $sub) = @_;
83 2         10 $self->_run_cmd({ data => $path."\0", type => OWNET_MSG_GET }, $sub);
84             }
85              
86              
87             sub dirallslash {
88 1     1 1 423 my ($self, $path, $sub) = @_;
89 1         5 $self->_run_cmd({ data => $path."\0", type => OWNET_MSG_DIRALLSLASH }, $sub);
90             }
91              
92              
93             sub getslash {
94 5     5 1 277 my ($self, $path, $sub) = @_;
95 5         19 $self->_run_cmd({ data => $path."\0", type => OWNET_MSG_GETSLASH }, $sub);
96             }
97              
98             sub _run_cmd {
99 22     22   41 my $self = shift;
100 22         22 my $cmd = shift;
101              
102 22         24 my ($cv, $cb);
103 22 50       63 if (@_) {
104 22 50       79 $cv = pop if UNIVERSAL::isa($_[-1], 'AnyEvent::CondVar');
105 22 100       60 $cb = pop if ref $_[-1] eq 'CODE';
106             }
107              
108 22   33     642 $cv ||= AnyEvent->condvar;
109              
110 22         158 print STDERR "using condvar $cv\n" if DEBUG;
111              
112             $cv->cb(subname 'command_cb' => sub {
113 7     7   38 my $cv = shift;
114 7         6 print STDERR "calling callback $cv\n" if DEBUG;
115             try {
116 7         254 my $res = $cv->recv;
117 7         47 $cb->($res);
118             } catch {
119 0   0     0 ($self->{on_error} || sub { die "ARGH: @_\n"; })->($_);
120             }
121 22 100       119 }) if $cb;
  7         50  
122              
123              
124 22         67 print STDERR 'Running command, ', $cmd->{type}, "\n" if DEBUG;
125 22 100       59 if( $self->{handle} ){
126 14         60 $self->_command_sub($cmd, $cv);
127             } else {
128 8         21 $self->connect($cmd, $cv );
129             }
130              
131 22         57 return $cv;
132             }
133              
134       0     sub DESTROY { }
135              
136              
137             sub all_cv {
138 39     39 1 40 my $self = shift;
139 39 50       83 $self->{all_cv} = shift if @_;
140 39 100       87 unless ($self->{all_cv}) {
141 5         135 $self->{all_cv} = AnyEvent->condvar;
142 5     0   37 $self->{all_cv}->cb( sub { print STDERR "all_cv done\n" } ) if DEBUG;
  0         0  
143             }
144 39         108 $self->{all_cv};
145             }
146              
147              
148             sub cleanup {
149 3     3 1 4 my $self = shift;
150 3         4 print STDERR "cleanup\n" if DEBUG;
151 3 100       35 $self->{all_cv}->croak(@_) if ($self->{all_cv});
152 3 100       34 $self->{connect_cv}->croak( @_ ) if $self->{connect_cv};
153              
154 3         11 while (@{$self->{connect_queue}}) {
  4         17  
155 1         3 my $queue = shift @{$self->{connect_queue}};
  1         3  
156 1         2 my($cmd, $cv) = @$queue;
157 1         4 $cv->croak(@_);
158             }
159              
160 3         7 delete $self->{all_cv};
161 3         5 delete $self->{connect_cv};
162 3         15 delete $self->{sock};
163 3         19 delete $self->{handle};
164 3 100       21 $self->{on_error}->(@_) if $self->{on_error};
165             }
166              
167              
168             sub connect {
169 8     8 1 16 my( $self, $cmd, $cv ) = @_;
170              
171 8 50       19 push @{$self->{connect_queue}}, [ $cmd, $cv ]
  8         26  
172             if @_;
173              
174             # setup already ongoing
175             return $self->{connect_cv}
176 8 100       27 if $self->{connect_cv};
177              
178             # or start setup
179 6         146 $self->{connect_cv} = AnyEvent->condvar;
180 6     0   29 $self->{connect_cv}->cb( sub { print STDERR "connect_cv done\n" } ) if DEBUG;
  0         0  
181              
182             $self->{sock} = tcp_connect $self->{host}, $self->{port},
183             subname 'tcp_connect_cb' => sub {
184              
185             my $fh = shift
186 6 100   6   748 or do {
187 1         5 my $err = "Can't connect owserver: $!";
188 1         3 $self->cleanup($err);
189             return
190 1         19 };
191              
192 5         6 warn "Connected\n" if DEBUG;
193              
194             $self->{handle} = AnyEvent::Handle->new(
195             fh => $fh,
196             on_error => subname('on_error_cb' => sub {
197 1         17 print STDERR "handle error $_[2]\n" if DEBUG;
198 1         4 $_[0]->destroy;
199 1 50       30 if ($_[1]) {
200 1         4 $self->cleanup('Error: '.$_[2]);
201             }
202             }),
203             on_eof => subname('on_eof_cb' => sub {
204 0         0 print STDERR "handle eof\n" if DEBUG;
205 0         0 $_[0]->destroy;
206 0         0 $self->cleanup('Connection closed');
207             }),
208             on_timeout => subname('on_timeout_cb' => sub {
209 1         100873 print STDERR "handle timeout\n" if DEBUG;
210 1         8 $_[0]->destroy;
211 1         230 $self->cleanup('Socket timeout');
212             })
213 5         115 );
214 5         376 while (@{$self->{connect_queue}}) {
  12         38  
215 7         8 my $queue = shift @{$self->{connect_queue}};
  7         14  
216 7         19 $self->_command_sub(@$queue );
217             }
218 5         21 $self->{connect_cv}->send(1);
219 5         100 delete $self->{connect_cv};
220 6         88 };
221              
222 6         2672 return $self->{connect_cv};
223             }
224              
225              
226             sub _command_sub {
227 21     21   25 my( $self, $command, $cv ) = @_;
228              
229 21         38 $self->all_cv->begin;
230              
231 21         98 my $msg = $self->_msg($command);
232 21         19 print STDERR "sending command ", $command->{type}, "\n" if DEBUG;
233 21         14 warn 'Sending: ', (unpack 'H*', $msg), "\n" if DEBUG;
234              
235 21         88 $self->{handle}->push_write($msg);
236 21         1370 $self->{handle}->timeout($self->{timeout});
237              
238             $self->{handle}->push_read(ref $self, $command => subname 'push_read_cb' => sub {
239 18     18   20 my($handle, $res, $err) = @_;
240 18         42 $handle->timeout(0);
241 18         211 print STDERR "read finished $cv\n" if DEBUG;
242 18         15 print STDERR "read ",
243             ($cv->ready ? "ready" : "not ready"), "\n" if DEBUG;
244 18         34 $self->all_cv->end;
245 18 50       235 if ($err) {
246 0         0 print STDERR "returning error $err\n" if DEBUG;
247 0         0 return $cv->croak($res)
248             }
249 18         15 print STDERR "Sending $res\n" if DEBUG;
250 18         38 $cv->send($res);
251 21         820 });
252              
253 21         615 return $cv;
254             }
255              
256              
257             sub devices {
258 4     4 1 227 my ($self, $cb, $offset, $cv) = @_;
259 4   100     15 $offset ||= '/';
260 4   66     50 $cv ||= AnyEvent->condvar;
261 4         11 print STDERR "devices: $offset\n" if DEBUG;
262 4         11 $cv->begin;
263             $self->getslash($offset, subname 'devices_getslash_cb' => sub {
264 4     4   4 my $res = shift;
265 4 50       10 if ($res->is_success) {
266 4         9 foreach my $d ($res->data_list) {
267 14 100       48 if ($d =~ m!^.*/[0-9a-f]{2}\.[0-9a-f]{12}/$!i) {
    100          
268 2         5 $cb->($d, $cv);
269 2         6 $self->devices($cb, $d, $cv);
270             } elsif ($d =~ m!/(?:main|aux)/$!) {
271 1         2 $self->devices($cb, $d, $cv);
272             }
273             }
274             } # TOFIX: propagate error?
275 4         10 $cv->end;
276 4         41 });
277 4         8 $cv;
278             }
279              
280              
281             sub device_files {
282 0     0 1 0 my ($self, $cb, $files, $offset, $cv) = @_;
283 0 0       0 $files = [$files] unless (ref $files);
284             $cv = $self->devices(subname('device_files_devices_cb' => sub {
285 0     0   0 my $dev = shift;
286 0         0 foreach my $file (@$files) {
287 0         0 $cv->begin;
288             $self->get($dev.$file,
289             subname 'device_files_get_cb' => sub {
290 0         0 my $res = shift;
291 0         0 $cv->end;
292 0         0 my $value = $res->{data};
293 0 0       0 return unless (defined $value);
294 0         0 $cb->($dev, $file, 0+$value);
295 0         0 });
296             }
297 0         0 }), $offset, $cv);
298             }
299              
300              
301             sub anyevent_read_type {
302 21     21 1 174 my ($handle, $cb, $command) = @_;
303              
304 21         26 my $MAX_RETURN = 66000;
305 21         19 my @data;
306             subname 'anyevent_read_type_reader' => sub {
307 41     41   214527 my $rbuf = \$handle->{rbuf};
308              
309 47 100       126 REDO:
310             return unless (defined $$rbuf);
311 40         41 my $len;
312              
313             my %result;
314 0         0 my $header;
315             do {
316 40         42 $len = length $$rbuf;
317 40         28 print STDERR "read_type has $len bytes\n" if DEBUG;
318 40         47 print STDERR "read_type has ", (unpack 'H*', $$rbuf), "\n" if DEBUG;
319 40 100       89 return unless ($len >= 24);
320 25         106 @result{qw/version payload ret sg size offset/} = unpack 'N6', $$rbuf;
321 25         58 $header = substr $$rbuf, 0, 24, '';
322 25         20 print STDERR "read_type header ", (unpack 'H*', $header), "\n" if DEBUG;
323 25 100       77 if ($result{'ret'} > $MAX_RETURN) {
324 4         22 $cb->($handle, AnyEvent::OWNet::Response->new(%result));
325 4         30 return 1;
326             }
327 40         35 } while ($result{payload} > $MAX_RETURN);
328              
329 21         23 my $total_len = 24 + $result{payload};
330 21         18 print STDERR "read_type have ", $len, " need ", $total_len, "\n" if DEBUG;
331 21 100       35 unless ($len >= $total_len) {
332 1         2 $$rbuf = $header.$$rbuf;
333 1         4 return;
334             }
335              
336 20         33 my $data = substr $$rbuf, 0, $result{payload}, '';
337 20 100       39 if ($command->{type} == OWNET_MSG_DIR) {
338 8 100       46 if ($data eq '') {
339 2         4 $result{data} = \@data;
340             } else {
341 6         12 push @data, substr $data, 0, -1;
342 6         22 goto REDO;
343             }
344             } else {
345 12         19 $result{data} = $data;
346             }
347 14         10 print STDERR "read_type complete\n" if DEBUG;
348 14         77 $cb->($handle, AnyEvent::OWNet::Response->new(%result));
349 14         546 return 1;
350             }
351 21         171 }
352              
353             1;
354              
355             __END__