File Coverage

blib/lib/AnyEvent/FCP.pm
Criterion Covered Total %
statement 20 102 19.6
branch 0 22 0.0
condition 0 22 0.0
subroutine 6 22 27.2
pod 1 7 14.2
total 27 175 15.4


line stmt bran cond sub pod time code
1             =head1 NAME
2              
3             AnyEvent::FCP - freenet client protocol 2.0
4              
5             =head1 SYNOPSIS
6              
7             use AnyEvent::FCP;
8              
9             my $fcp = new AnyEvent::FCP;
10              
11             # transactions return condvars
12             my $lp_cv = $fcp->list_peers;
13             my $pr_cv = $fcp->list_persistent_requests;
14              
15             my $peers = $lp_cv->recv;
16             my $reqs = $pr_cv->recv;
17              
18             =head1 DESCRIPTION
19              
20             This module implements the freenet client protocol version 2.0, as used by
21             freenet 0.7. See L for the earlier freenet 0.5 version.
22              
23             See L for a
24             description of what the messages do.
25              
26             The module uses L to find a suitable event module.
27              
28             Only very little is implemented, ask if you need more, and look at the
29             example program later in this section.
30              
31             =head2 EXAMPLE
32              
33             This example fetches the download list and sets the priority of all files
34             with "a" in their name to "emergency":
35              
36             use AnyEvent::FCP;
37              
38             my $fcp = new AnyEvent::FCP;
39              
40             $fcp->watch_global_sync (1, 0);
41             my $req = $fcp->list_persistent_requests_sync;
42              
43             for my $req (values %$req) {
44             if ($req->{filename} =~ /a/) {
45             $fcp->modify_persistent_request_sync (1, $req->{identifier}, undef, 0);
46             }
47             }
48              
49             =head2 IMPORT TAGS
50              
51             Nothing much can be "imported" from this module right now.
52              
53             =head2 THE AnyEvent::FCP CLASS
54              
55             =over 4
56              
57             =cut
58              
59             package AnyEvent::FCP;
60              
61 1     1   1342 use common::sense;
  1         8  
  1         5  
62              
63 1     1   49 use Carp;
  1         2  
  1         102  
64              
65             our $VERSION = '0.3';
66              
67 1     1   5 use Scalar::Util ();
  1         5  
  1         17  
68              
69 1     1   1809 use AnyEvent;
  1         6087  
  1         32  
70 1     1   1249 use AnyEvent::Handle;
  1         30283  
  1         3734  
71              
72             sub touc($) {
73 0     0 0 0 local $_ = shift;
74 0         0 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime)(?:_|$))/\U$1/;
75 0         0 s/(?:^|_)(.)/\U$1/g;
76 0         0 $_
77             }
78              
79             sub tolc($) {
80 0     0 0 0 local $_ = shift;
81 0         0 1 while s/(SVK|CHK|URI|FCP|DS|MIME)([^_])/$1\_$2/i;
82 0         0 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME)/$1\_$2/i;
83 0         0 s/(?<=[a-z])(?=[A-Z])/_/g;
84             lc
85 0         0 }
86              
87             =item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name]
88              
89             Create a new FCP connection to the given host and port (default
90             127.0.0.1:9481, or the environment variables C and C).
91              
92             If no C was specified, then AnyEvent::FCP will generate a
93             (hopefully) unique client name for you.
94              
95             You can install a progress callback that is being called with the AnyEvent::FCP
96             object, the type, a hashref with key-value pairs and a reference to any received data,
97             for all unsolicited messages.
98              
99             Example:
100              
101             sub progress_cb {
102             my ($self, $type, $kv, $rdata) = @_;
103              
104             if ($type eq "simple_progress") {
105             warn "$kv->{identifier} $kv->{succeeded}/$kv->{required}\n";
106             }
107             }
108              
109             =cut
110              
111             sub new {
112 0     0 1 0 my $class = shift;
113 0         0 my $self = bless { @_ }, $class;
114              
115 0   0     0 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1";
      0        
116 0   0     0 $self->{port} ||= $ENV{FREDPORT} || 9481;
      0        
117 0   0     0 $self->{name} ||= time.rand.rand.rand; # lame
118 0   0     0 $self->{timeout} ||= 600;
119 0   0 0   0 $self->{progress} ||= sub { };
  0         0  
120              
121 0         0 $self->{id} = "a0";
122              
123             {
124 0         0 Scalar::Util::weaken (my $self = $self);
  0         0  
125              
126             $self->{hdl} = new AnyEvent::Handle
127             connect => [$self->{host} => $self->{port}],
128             timeout => $self->{timeout},
129             on_error => sub {
130 0     0   0 warn "<@_>\n";
131 0         0 exit 1;
132             },
133 0     0   0 on_read => sub { $self->on_read (@_) },
134 0   0 0   0 on_eof => $self->{on_eof} || sub { };
  0         0  
135              
136 0         0 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
137             }
138              
139             $self->send_msg (
140 0         0 client_hello =>
141             name => $self->{name},
142             expected_version => "2.0",
143             );
144              
145 0         0 $self
146             }
147              
148             sub send_msg {
149 0     0 0 0 my ($self, $type, %kv) = @_;
150              
151 0         0 my $data = delete $kv{data};
152              
153 0 0       0 if (exists $kv{id_cb}) {
154 0   0     0 my $id = $kv{identifier} || ++$self->{id};
155 0         0 $self->{id}{$id} = delete $kv{id_cb};
156 0         0 $kv{identifier} = $id;
157             }
158              
159 0         0 my $msg = (touc $type) . "\012"
160             . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
161              
162             sub id {
163 0     0 0 0 my ($self) = @_;
164              
165              
166             }
167              
168 0 0       0 if (defined $data) {
169 0         0 $msg .= "DataLength=" . (length $data) . "\012"
170             . "Data\012$data";
171             } else {
172 0         0 $msg .= "EndMessage\012";
173             }
174              
175 0         0 $self->{hdl}->push_write ($msg);
176             }
177              
178             sub on_read {
179 0     0 0 0 my ($self) = @_;
180              
181 0         0 my $type;
182             my %kv;
183 0         0 my $rdata;
184              
185             my $done_cb = sub {
186 0     0   0 $kv{pkt_type} = $type;
187              
188 0 0       0 if (my $cb = $self->{queue}[0]) {
189 0         0 $cb->($self, $type, \%kv, $rdata)
190 0 0       0 and shift @{ $self->{queue} };
191             } else {
192 0         0 $self->default_recv ($type, \%kv, $rdata);
193             }
194 0         0 };
195              
196 0         0 my $hdr_cb; $hdr_cb = sub {
197 0 0   0   0 if ($_[1] =~ /^([^=]+)=(.*)$/) {
    0          
    0          
198 0         0 my ($k, $v) = ($1, $2);
199 0         0 my @k = split /\./, tolc $k;
200 0         0 my $ro = \\%kv;
201              
202 0         0 while (@k) {
203 0         0 my $k = shift @k;
204 0 0       0 if ($k =~ /^\d+$/) {
205 0         0 $ro = \$$ro->[$k];
206             } else {
207 0         0 $ro = \$$ro->{$k};
208             }
209             }
210              
211 0         0 $$ro = $v;
212              
213 0         0 $_[0]->push_read (line => $hdr_cb);
214             } elsif ($_[1] eq "Data") {
215             $_[0]->push_read (chunk => delete $kv{data_length}, sub {
216 0         0 $rdata = \$_[1];
217 0         0 $done_cb->();
218 0         0 });
219             } elsif ($_[1] eq "EndMessage") {
220 0         0 $done_cb->();
221             } else {
222 0         0 die "protocol error, expected message end, got $_[1]\n";#d#
223             }
224 0         0 };
225              
226             $self->{hdl}->push_read (line => sub {
227 0     0   0 $type = tolc $_[1];
228 0         0 $_[0]->push_read (line => $hdr_cb);
229 0         0 });
230             }
231              
232             sub default_recv {
233 0     0 0 0 my ($self, $type, $kv, $rdata) = @_;
234              
235 0 0       0 if ($type eq "node_hello") {
    0          
236 0         0 $self->{node_hello} = $kv;
237             } elsif (exists $self->{id}{$kv->{identifier}}) {
238 0 0       0 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
239             and delete $self->{id}{$kv->{identifier}};
240             } else {
241 0         0 &{ $self->{progress} };
  0         0  
242             }
243             }
244              
245             sub _txn {
246 8     8   16 my ($name, $sub) = @_;
247              
248 8         37 *{$name} = sub {
249 0     0   0 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
250 0         0 &$sub;
251 0         0 $cv
252 8         23 };
253              
254 8         40 *{"$name\_sync"} = sub {
255 0     0     splice @_, 1, 0, (my $cv = AnyEvent->condvar);
256 0           &$sub;
257 0           $cv->recv
258 8         23 };
259             }
260              
261             =item $cv = $fcp->list_peers ([$with_metdata[, $with_volatile]])
262              
263             =item $peers = $fcp->list_peers_sync ([$with_metdata[, $with_volatile]])
264              
265             =cut
266              
267             _txn list_peers => sub {
268             my ($self, $cv, $with_metadata, $with_volatile) = @_;
269              
270             my @res;
271              
272             $self->send_msg (list_peers =>
273             with_metadata => $with_metadata ? "true" : "false",
274             with_volatile => $with_volatile ? "true" : "false",
275             id_cb => sub {
276             my ($self, $type, $kv, $rdata) = @_;
277              
278             if ($type eq "end_list_peers") {
279             $cv->(\@res);
280             1
281             } else {
282             push @res, $kv;
283             0
284             }
285             },
286             );
287             };
288              
289             =item $cv = $fcp->list_peer_notes ($node_identifier)
290              
291             =item $notes = $fcp->list_peer_notes_sync ($node_identifier)
292              
293             =cut
294              
295             _txn list_peer_notes => sub {
296             my ($self, $cv, $node_identifier) = @_;
297              
298             $self->send_msg (list_peer_notes =>
299             node_identifier => $node_identifier,
300             id_cb => sub {
301             my ($self, $type, $kv, $rdata) = @_;
302              
303             $cv->($kv);
304             1
305             },
306             );
307             };
308              
309             =item $cv = $fcp->watch_global ($enabled[, $verbosity_mask])
310              
311             =item $fcp->watch_global_sync ($enabled[, $verbosity_mask])
312              
313             =cut
314              
315             _txn watch_global => sub {
316             my ($self, $cv, $enabled, $verbosity_mask) = @_;
317              
318             $self->send_msg (watch_global =>
319             enabled => $enabled ? "true" : "false",
320             defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
321             );
322              
323             $cv->();
324             };
325              
326             =item $cv = $fcp->list_persistent_requests
327              
328             =item $reqs = $fcp->list_persistent_requests_sync
329              
330             =cut
331              
332             _txn list_persistent_requests => sub {
333             my ($self, $cv) = @_;
334              
335             my %res;
336              
337             $self->send_msg ("list_persistent_requests");
338              
339             push @{ $self->{queue} }, sub {
340             my ($self, $type, $kv, $rdata) = @_;
341              
342             if ($type eq "end_list_persistent_requests") {
343             $cv->(\%res);
344             1
345             } else {
346             my $id = $kv->{identifier};
347              
348             if ($type =~ /^persistent_(get|put|put_dir)$/) {
349             $res{$id} = {
350             type => $1,
351             %{ $res{$id} },
352             %$kv,
353             };
354             } elsif ($type eq "simple_progress") {
355             delete $kv->{pkt_type}; # save memory
356             push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
357             } else {
358             $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
359             }
360             0
361             }
362             };
363             };
364              
365             =item $cv = $fcp->remove_request ($global, $identifier)
366              
367             =item $status = $fcp->remove_request_sync ($global, $identifier)
368              
369             =cut
370              
371             _txn remove_request => sub {
372             my ($self, $cv, $global, $identifier) = @_;
373              
374             $self->send_msg (remove_request =>
375             global => $global ? "true" : "false",
376             identifier => $identifier,
377             id_cb => sub {
378             my ($self, $type, $kv, $rdata) = @_;
379              
380             $cv->($kv);
381             1
382             },
383             );
384             };
385              
386             =item $cv = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
387              
388             =item $sync = $fcp->modify_persistent_request_sync ($global, $identifier[, $client_token[, $priority_class]])
389              
390             =cut
391              
392             _txn modify_persistent_request => sub {
393             my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
394              
395             $self->send_msg (modify_persistent_request =>
396             global => $global ? "true" : "false",
397             defined $client_token ? (client_token => $client_token ) : (),
398             defined $priority_class ? (priority_class => $priority_class) : (),
399             identifier => $identifier,
400             id_cb => sub {
401             my ($self, $type, $kv, $rdata) = @_;
402              
403             $cv->($kv);
404             1
405             },
406             );
407             };
408              
409             =item $cv = $fcp->get_plugin_info ($name, $detailed)
410              
411             =item $info = $fcp->get_plugin_info_sync ($name, $detailed)
412              
413             =cut
414              
415             _txn get_plugin_info => sub {
416             my ($self, $cv, $name, $detailed) = @_;
417              
418             $self->send_msg (get_plugin_info =>
419             plugin_name => $name,
420             detailed => $detailed ? "true" : "false",
421             id_cb => sub {
422             my ($self, $type, $kv, $rdata) = @_;
423              
424             $cv->($kv);
425             1
426             },
427             );
428             };
429              
430             =item $cv = $fcp->client_get ($uri, $identifier, %kv)
431              
432             =item $status = $fcp->client_get_sync ($uri, $identifier, %kv)
433              
434             %kv can contain (L).
435              
436             ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
437             priority_class, persistence, client_token, global, return_type,
438             binary_blob, allowed_mime_types, filename, temp_filename
439              
440             =cut
441              
442             _txn client_get => sub {
443             my ($self, $cv, $uri, $identifier, %kv) = @_;
444              
445             $self->send_msg (client_get =>
446             %kv,
447             uri => $uri,
448             identifier => $identifier,
449             id_cb => sub {
450             my ($self, $type, $kv, $rdata) = @_;
451              
452             $cv->($kv);
453             1
454             },
455             );
456             };
457              
458             =back
459              
460             =head1 EXAMPLE PROGRAM
461              
462             use AnyEvent::FCP;
463              
464             my $fcp = new AnyEvent::FCP;
465              
466             # let us look at the global request list
467             $fcp->watch_global (1, 0);
468              
469             # list them, synchronously
470             my $req = $fcp->list_persistent_requests_sync;
471              
472             # go through all requests
473             for my $req (values %$req) {
474             # skip jobs not directly-to-disk
475             next unless $req->{return_type} eq "disk";
476             # skip jobs not issued by FProxy
477             next unless $req->{identifier} =~ /^FProxy:/;
478              
479             if ($req->{data_found}) {
480             # file has been successfully downloaded
481            
482             ... move the file away
483             (left as exercise)
484              
485             # remove the request
486              
487             $fcp->remove_request (1, $req->{identifier});
488             } elsif ($req->{get_failed}) {
489             # request has failed
490             if ($req->{get_failed}{code} == 11) {
491             # too many path components, should restart
492             } else {
493             # other failure
494             }
495             } else {
496             # modify priorities randomly, to improve download rates
497             $fcp->modify_persistent_request (1, $req->{identifier}, undef, int 6 - 5 * (rand) ** 1.7)
498             if 0.1 > rand;
499             }
500             }
501              
502             # see if the dummy plugin is loaded, to ensure all previous requests have finished.
503             $fcp->get_plugin_info_sync ("dummy");
504              
505             =head1 SEE ALSO
506              
507             L, L.
508              
509             =head1 BUGS
510              
511             =head1 AUTHOR
512              
513             Marc Lehmann
514             http://home.schmorp.de/
515              
516             =cut
517              
518             1
519