File Coverage

blib/lib/AnyEvent/OWNet.pm
Criterion Covered Total %
statement 191 211 90.5
branch 52 62 83.8
condition 6 10 60.0
subroutine 31 34 91.1
pod 15 15 100.0
total 295 332 88.8


line stmt bran cond sub pod time code
1 3     3   59774 use strict;
  3         8  
  3         164  
2 3     3   15 use warnings;
  3         6  
  3         140  
3             package AnyEvent::OWNet;
4             BEGIN {
5 3     3   124 $AnyEvent::OWNet::VERSION = '1.110430';
6             }
7              
8             # ABSTRACT: Client for 1-wire File System server
9              
10              
11 3     3   131 use 5.008;
  3         9  
  3         206  
12 3     3   17 use constant DEBUG => $ENV{ANYEVENT_OWNET_DEBUG};
  3         6  
  3         351  
13 3     3   2226 use AnyEvent;
  3         8444  
  3         80  
14 3     3   1260 use AnyEvent::Handle;
  3         25955  
  3         94  
15 3     3   1620 use AnyEvent::Socket;
  3         27623  
  3         469  
16 3     3   27 use Carp qw/croak/;
  3         6  
  3         152  
17 3     3   3164 use Try::Tiny;
  3         4977  
  3         203  
18              
19 3     3   2002 use AnyEvent::OWNet::Constants;
  3         7  
  3         19  
20              
21 3     3   1936 use AnyEvent::OWNet::Response;
  3         9  
  3         9565  
22              
23              
24             sub new {
25 6     6 1 3528 my ($pkg, %p) = @_;
26 6         63 bless
27             {
28             connect_queue => [],
29             host => '127.0.0.1',
30             port => 4304,
31             timeout => 5,
32             %p,
33             }, $pkg;
34             }
35              
36             sub _msg {
37 23     23   567 my ($self, $req) = @_;
38 23 100       65 my $version = exists $req->{version} ? $req->{version} : 0;
39 23 100       77 my $data = exists $req->{data} ? $req->{data} : '';
40 23         49 my $payload = length $data;
41 23 100       64 my $type =
42             exists $req->{type} ? $req->{type} : OWNET_MSG_READ; # default to read
43 23 100       54 my $sg = exists $req->{sg} ? $req->{sg} : OWNET_DEFAULT_FLAGS;
44 23 100       52 my $size = exists $req->{size} ? $req->{size} : OWNET_DEFAULT_DATA_SIZE;
45 23 100       64 my $offset = exists $req->{offset} ? $req->{offset} : 0;
46 23         167 return pack 'N6a*', $version, $payload, $type, $sg, $size, $offset, $data;
47             }
48              
49              
50             sub read {
51 3     3 1 1617 my ($self, $path, $sub) = @_;
52 3         25 $self->_run_cmd({ data => $path.chr(0), type => OWNET_MSG_READ }, $sub);
53             }
54              
55              
56             sub write {
57 2     2 1 276 my ($self, $path, $value, $sub) = @_;
58 2         19 $self->_run_cmd({ data => $path.chr(0).$value,
59             size => length $value,
60             type => OWNET_MSG_WRITE }, $sub);
61             }
62              
63              
64             sub dir {
65 6     6 1 1737 my ($self, $path, $sub) = @_;
66 6         45 $self->_run_cmd({ data => $path."\0", type => OWNET_MSG_DIR, size => 0 },
67             $sub);
68             }
69              
70              
71             sub present {
72 2     2 1 938 my ($self, $path, $sub) = @_;
73 2         11 $self->_run_cmd({ data => $path."\0", type => OWNET_MSG_PRESENT }, $sub);
74             }
75              
76              
77             sub dirall {
78 1     1 1 241 my ($self, $path, $sub) = @_;
79 1         6 $self->_run_cmd({ data => $path."\0", type => OWNET_MSG_DIRALL }, $sub);
80             }
81              
82              
83             sub get {
84 2     2 1 2367 my ($self, $path, $sub) = @_;
85 2         15 $self->_run_cmd({ data => $path."\0", type => OWNET_MSG_GET }, $sub);
86             }
87              
88              
89             sub dirallslash {
90 1     1 1 1744 my ($self, $path, $sub) = @_;
91 1         52 $self->_run_cmd({ data => $path."\0", type => OWNET_MSG_DIRALLSLASH }, $sub);
92             }
93              
94              
95             sub getslash {
96 5     5 1 1191 my ($self, $path, $sub) = @_;
97 5         29 $self->_run_cmd({ data => $path."\0", type => OWNET_MSG_GETSLASH }, $sub);
98             }
99              
100             sub _run_cmd {
101 22     22   41 my $self = shift;
102 22         38 my $cmd = shift;
103              
104 22         30 print STDERR 'Running command, ', $cmd->{type}, "\n" if DEBUG;
105 22 100       98 $self->{cmd_cb} or return $self->connect($cmd, @_);
106 14         45 $self->{cmd_cb}->($cmd, @_);
107             }
108              
109 0     0   0 sub DESTROY { }
110              
111              
112             sub all_cv {
113 39     39 1 59 my $self = shift;
114 39 50       87 $self->{all_cv} = shift if @_;
115 39 100       134 unless ($self->{all_cv}) {
116 5         159 $self->{all_cv} = AnyEvent->condvar;
117             }
118 39         195 $self->{all_cv};
119             }
120              
121              
122             sub cleanup {
123 3     3 1 6 my $self = shift;
124 3         4 print STDERR "cleanup\n" if DEBUG;
125 3 100       31 $self->{all_cv}->croak(@_) if ($self->{all_cv});
126 3         27 while (@{$self->{connect_queue}}) {
  4         24  
127 1         2 my $queue = shift @{$self->{connect_queue}};
  1         3  
128 1         2 my($cv, @args) = @$queue;
129 1         5 $cv->croak(@_);
130             }
131 3         15 delete $self->{all_cv};
132 3         18 delete $self->{cmd_cb};
133 3         17 delete $self->{sock};
134 3 100       31 $self->{on_error}->(@_) if $self->{on_error};
135             }
136              
137              
138             sub connect {
139 8     8 1 13 my $self = shift;
140              
141 8         11 my $cv;
142 8 50       24 if (@_) {
143 8         268 $cv = AnyEvent->condvar;
144 8         46 push @{$self->{connect_queue}}, [ $cv, @_ ];
  8         29  
145             }
146              
147 8 100       57 return $cv if $self->{sock};
148              
149             $self->{sock} = tcp_connect $self->{host}, $self->{port}, sub {
150              
151             my $fh = shift
152 6 100   6   878 or do {
153 1         5 my $err = "Can't connect owserver: $!";
154 1         3 $self->cleanup($err);
155 1         3 $cv->croak($err);
156             return
157 1         24 };
158              
159 5         10 warn "Connected\n" if DEBUG;
160              
161             my $hd =
162             AnyEvent::Handle->new(
163             fh => $fh,
164             on_error => sub {
165 1         46 print STDERR "handle error $_[2]\n" if DEBUG;
166 1         4 $_[0]->destroy;
167 1 50       57 if ($_[1]) {
168 1         6 $self->cleanup('Error: '.$_[2]);
169             }
170             },
171             on_eof => sub {
172 0         0 print STDERR "handle eof\n" if DEBUG;
173 0         0 $_[0]->destroy;
174 0         0 $self->cleanup('Connection closed');
175             },
176             on_timeout => sub {
177 1         100713 print STDERR "handle timeout\n" if DEBUG;
178 1         8 $_[0]->destroy;
179 1         183 $self->cleanup('Socket timeout');
180             }
181 5         74 );
182             $self->{cmd_cb} = sub {
183 21         65 $self->all_cv->begin;
184 21         110 my $command = shift;
185              
186 21         32 my ($cv, $cb);
187 21 50       130 if (@_) {
188 21 100       83 $cv = pop if UNIVERSAL::isa($_[-1], 'AnyEvent::CondVar');
189 21 100       61 $cb = pop if ref $_[-1] eq 'CODE';
190             }
191              
192 21         71 my $msg = $self->_msg($command);
193 21         29 print STDERR "sending command ", $command->{type}, "\n" if DEBUG;
194 21         24 warn 'Sending: ', (unpack 'H*', $msg), "\n" if DEBUG;
195              
196 21         105 $hd->push_write($msg);
197 21         2316 $hd->timeout($self->{timeout});
198              
199 21   66     1338 $cv ||= AnyEvent->condvar;
200              
201 21         139 print STDERR "using condvar $cv\n" if DEBUG;
202              
203             $cv->cb(sub {
204 7         64 my $cv = shift;
205 7         9 print STDERR "calling callback $cv\n" if DEBUG;
206             try {
207 7         253 my $res = $cv->recv;
208 7         65 $cb->($res);
209             } catch {
210 0   0     0 ($self->{on_error} || sub { die "ARGH: @_\n"; })->($_);
211             }
212 21 100       97 }) if $cb;
  7         54  
213              
214             $hd->push_read(ref $self, $command => sub {
215 18         32 my($handle, $res, $err) = @_;
216 18         68 $hd->timeout(0);
217 18         265 print STDERR "read finished $cv\n" if DEBUG;
218 18         23 print STDERR "read ",
219             ($cv->ready ? "ready" : "not ready"), "\n" if DEBUG;
220 18         53 $self->all_cv->end;
221 18 50       329 if ($err) {
222 0         0 print STDERR "returning error $err\n" if DEBUG;
223 0         0 return $cv->croak($res)
224             }
225 18         26 print STDERR "Sending $res\n" if DEBUG;
226 18         76 $cv->send($res);
227 21         217 });
228 21         907 return $cv;
229 5         457 };
230              
231 5         11 while (@{$self->{connect_queue}}) {
  12         87  
232 7         12 my $queue = shift @{$self->{connect_queue}};
  7         15  
233 7         15 my($cv, @args) = @$queue;
234 7         19 $self->{cmd_cb}->(@args, $cv);
235             }
236             # $cv->send(1);
237 6         65 };
238              
239 6         2941 return $cv;
240             }
241              
242              
243             sub devices {
244 4     4 1 257 my ($self, $cb, $offset, $cv) = @_;
245 4   100     15 $offset ||= '/';
246 4   66     55 $cv ||= AnyEvent->condvar;
247 4         12 print STDERR "devices: $offset\n" if DEBUG;
248 4         17 $cv->begin;
249             $self->getslash($offset, sub {
250 4     4   5 my $res = shift;
251 4 50       19 if ($res->is_success) {
252 4         13 foreach my $d ($res->data_list) {
253 14 100       65 if ($d =~ m!^.*/[0-9a-f]{2}\.[0-9a-f]{12}/$!i) {
    100          
254 2         6 $cb->($d, $cv);
255 2         9 $self->devices($cb, $d, $cv);
256             } elsif ($d =~ m!/(?:main|aux)/$!) {
257 1         5 $self->devices($cb, $d, $cv);
258             }
259             }
260             } # TOFIX: propogate error?
261 4         13 $cv->end;
262 4         31 });
263 4         11 $cv;
264             }
265              
266              
267             sub device_files {
268 0     0 1 0 my ($self, $cb, $files, $offset, $cv) = @_;
269 0 0       0 $files = [$files] unless (ref $files);
270             $cv = $self->devices(sub {
271 0     0   0 my $dev = shift;
272 0         0 foreach my $file (@$files) {
273 0         0 $cv->begin;
274             $self->get($dev.$file,
275             sub {
276 0         0 my $res = shift;
277 0         0 $cv->end;
278 0         0 my $value = $res->{data};
279 0 0       0 return unless (defined $value);
280 0         0 $cb->($dev, $file, 0+$value);
281 0         0 });
282             }
283 0         0 }, $offset, $cv);
284             }
285              
286              
287             sub anyevent_read_type {
288 21     21 1 261 my ($handle, $cb, $command) = @_;
289              
290 21         33 my $MAX_RETURN = 66000;
291 21         29 my @data;
292             sub {
293 40     40   223278 my $rbuf = \$handle->{rbuf};
294              
295 46 100       135 REDO:
296             return unless (defined $$rbuf);
297 39         53 my $len;
298              
299             my %result;
300 0         0 my $header;
301 39         53 do {
302 39         50 $len = length $$rbuf;
303 39         48 print STDERR "read_type has $len bytes\n" if DEBUG;
304 39         44 print STDERR "read_type has ", (unpack 'H*', $$rbuf), "\n" if DEBUG;
305 39 100       118 return unless ($len >= 24);
306 25         145 @result{qw/version payload ret sg size offset/} = unpack 'N6', $$rbuf;
307 25         68 $header = substr $$rbuf, 0, 24, '';
308 25         26 print STDERR "read_type header ", (unpack 'H*', $header), "\n" if DEBUG;
309 25 100       101 if ($result{'ret'} > $MAX_RETURN) {
310 4         32 $cb->($handle, AnyEvent::OWNet::Response->new(%result));
311 4         152 return 1;
312             }
313             } while ($result{payload} > $MAX_RETURN);
314              
315 21         35 my $total_len = 24 + $result{payload};
316 21         30 print STDERR "read_type have ", $len, " need ", $total_len, "\n" if DEBUG;
317 21 100       55 unless ($len >= $total_len) {
318 1         2 $$rbuf = $header.$$rbuf;
319 1         5 return;
320             }
321              
322 20         43 my $data = substr $$rbuf, 0, $result{payload}, '';
323 20 100       59 if ($command->{type} == OWNET_MSG_DIR) {
324 8 100       16 if ($data eq '') {
325 2         7 $result{data} = \@data;
326             } else {
327 6         15 push @data, substr $data, 0, -1;
328 6         31 goto REDO;
329             }
330             } else {
331 12         23 $result{data} = $data;
332             }
333 14         15 print STDERR "read_type complete\n" if DEBUG;
334 14         6107 $cb->($handle, AnyEvent::OWNet::Response->new(%result));
335 14         1383 return 1;
336             }
337 21         401 }
338              
339             1;
340              
341              
342             __END__