File Coverage

blib/lib/Net/Async/WebSearch.pm
Criterion Covered Total %
statement 258 280 92.1
branch 75 114 65.7
condition 56 109 51.3
subroutine 39 44 88.6
pod 12 12 100.0
total 440 559 78.7


line stmt bran cond sub pod time code
1             package Net::Async::WebSearch;
2             # ABSTRACT: IO::Async multi-provider web search aggregator
3 6     6   771158 use strict;
  6         9  
  6         185  
4 6     6   23 use warnings;
  6         14  
  6         271  
5 6     6   1234 use parent 'IO::Async::Notifier';
  6         837  
  6         29  
6              
7 6     6   44837 use Carp qw( croak );
  6         12  
  6         258  
8 6     6   25 use Future ();
  6         8  
  6         88  
9 6     6   2570 use Future::Utils qw( fmap_void );
  6         10957  
  6         448  
10 6     6   2469 use HTTP::Request::Common qw( GET );
  6         81766  
  6         436  
11 6     6   3151 use Net::Async::HTTP ();
  6         373836  
  6         186  
12 6     6   46 use URI ();
  6         7  
  6         69  
13              
14 6     6   2728 use Net::Async::WebSearch::Provider ();
  6         19  
  6         107  
15 6     6   2469 use Net::Async::WebSearch::Result ();
  6         19  
  6         18111  
16              
17             our $VERSION = '0.002';
18              
19             # Reciprocal Rank Fusion constant (Cormack et al.)
20             our $RRF_K = 60;
21              
22             sub _init {
23 21     21   265044 my ( $self, $args ) = @_;
24 21         78 $self->SUPER::_init($args);
25 21         67 $self->{providers} = [];
26 21         62 $self->{http} = delete $args->{http};
27 21   33     88 $self->{rrf_k} = delete $args->{rrf_k} // $RRF_K;
28 21   50     67 $self->{default_limit} = delete $args->{default_limit} // 10;
29 21   50     59 $self->{per_provider_limit} = delete $args->{per_provider_limit} // 10;
30 21   50     54 $self->{fetch_concurrency} = delete $args->{fetch_concurrency} // 100;
31             $self->{fetch_concurrency_per_target_ip}
32 21   50     79 = delete $args->{fetch_concurrency_per_target_ip} // 5;
33 21         33 $self->{fetch_timeout} = delete $args->{fetch_timeout};
34 21         32 $self->{fetch_max_bytes} = delete $args->{fetch_max_bytes};
35 21         28 $self->{fetch_user_agent} = delete $args->{fetch_user_agent};
36 21 100       44 if ( my $provs = delete $args->{providers} ) {
37 9         31 $self->add_provider($_) for @$provs;
38             }
39 21         35 return;
40             }
41              
42             sub configure_unknown {
43 0     0 1 0 my ( $self, %args ) = @_;
44 0         0 for my $k (qw(
45             http rrf_k default_limit per_provider_limit providers
46             fetch_concurrency fetch_concurrency_per_target_ip
47             fetch_timeout fetch_max_bytes fetch_user_agent
48             )) {
49 0         0 delete $args{$k};
50             }
51 0 0       0 return unless %args;
52 0         0 croak "Unknown configuration keys: ".join(',', sort keys %args);
53             }
54              
55 1     1 1 1173 sub providers { @{ $_[0]->{providers} } }
  1         3  
56 26 50   26 1 80 sub rrf_k { @_ > 1 ? ($_[0]->{rrf_k} = $_[1]) : $_[0]->{rrf_k} }
57 25 50   25 1 79 sub default_limit { @_ > 1 ? ($_[0]->{default_limit} = $_[1]) : $_[0]->{default_limit} }
58 30 50   30 1 139 sub per_provider_limit{ @_ > 1 ? ($_[0]->{per_provider_limit} = $_[1]) : $_[0]->{per_provider_limit} }
59              
60             sub add_provider {
61 28     28 1 151 my ( $self, $p ) = @_;
62 28 50 33     135 croak "provider must be a Net::Async::WebSearch::Provider"
63             unless ref $p && $p->isa('Net::Async::WebSearch::Provider');
64 28         30 my %taken = map { $_->name => 1 } @{ $self->{providers} };
  18         25  
  28         57  
65 28 100       103 if ( $taken{ $p->name } ) {
66 1         3 my $base = $p->name;
67 1         3 my $n = 2;
68 1         4 $n++ while $taken{ "$base#$n" };
69 1         2 $p->name( "$base#$n" );
70             }
71 28         51 push @{ $self->{providers} }, $p;
  28         80  
72 28         45 return $p;
73             }
74              
75             sub provider {
76 0     0 1 0 my ( $self, $name ) = @_;
77 0         0 for my $p ( @{ $self->{providers} } ) {
  0         0  
78 0 0       0 return $p if $p->name eq $name;
79             }
80 0         0 return;
81             }
82              
83             sub providers_matching {
84 2     2 1 5623 my ( $self, $sel ) = @_;
85 2         3 return grep { $_->matches($sel) } @{ $self->{providers} };
  12         36  
  2         4  
86             }
87              
88             sub http {
89 55     55 1 74 my ( $self ) = @_;
90 55 100       207 return $self->{http} if $self->{http};
91             my $http = Net::Async::HTTP->new(
92             user_agent => 'Net-Async-WebSearch/'.$VERSION,
93 2   50     32 max_connections_per_host => $self->{fetch_concurrency_per_target_ip} // 5,
94             max_in_flight => 0,
95             );
96 2         176 $self->add_child($http);
97 2         154 return $self->{http} = $http;
98             }
99              
100             sub _on_added_to_loop {
101 0     0   0 my ( $self, $loop ) = @_;
102 0 0       0 $self->SUPER::_on_added_to_loop($loop) if $self->can('SUPER::_on_added_to_loop');
103 0         0 $self->http;
104             }
105              
106             #----------------------------------------------------------------------
107             # Provider selection
108             #----------------------------------------------------------------------
109              
110             sub _select_providers {
111 30     30   97 my ( $self, %args ) = @_;
112 30         40 my $only = $args{only};
113 30         39 my $exclude = $args{exclude};
114 30         33 my @sel;
115 30         33 for my $p ( @{ $self->{providers} } ) {
  30         67  
116 64 100       133 next unless $p->enabled;
117 63 100 66     129 if ( $only && @$only ) {
118 34 100       41 next unless grep { $p->matches($_) } @$only;
  36         73  
119             }
120 48 100 66     103 if ( $exclude && @$exclude ) {
121 18 100       23 next if grep { $p->matches($_) } @$exclude;
  18         26  
122             }
123 41         59 push @sel, $p;
124             }
125 30         81 return @sel;
126             }
127              
128             #----------------------------------------------------------------------
129             # URL normalization for dedup
130             #----------------------------------------------------------------------
131              
132             sub _normalize_url {
133 79     79   111 my ( $self, $url ) = @_;
134 79 50 33     236 return '' unless defined $url && length $url;
135 79 50       86 my $u = eval { URI->new($url)->canonical } or return lc $url;
  79         172  
136 79         9759 my $s = $u->as_string;
137 79         243 $s =~ s{#.*$}{};
138 79         176 $s =~ s{/+$}{};
139 79         159 return lc $s;
140             }
141              
142             #----------------------------------------------------------------------
143             # Fetch (optional page-body retrieval after search)
144             #----------------------------------------------------------------------
145              
146             sub _fetch_one {
147 12     12   36 my ( $self, $result, %args ) = @_;
148 12         29 my $url = $result->url;
149 12 50 33     42 return Future->done unless defined $url && length $url;
150 12         32 my $req = GET($url);
151             $req->header( 'User-Agent' => $args{user_agent} // $self->{fetch_user_agent}
152 12   66     1471 // 'Net-Async-WebSearch/'.$VERSION );
      33        
153 12   50     618 $req->header( 'Accept' => $args{accept} // '*/*' );
154              
155 12         416 my %req_args = ( request => $req );
156 12 50       36 if ( defined $args{timeout} ) {
157 0         0 $req_args{timeout} = $args{timeout};
158             }
159             # NB: max_bytes is enforced on the decoded body below — Net::Async::HTTP
160             # does not cap the on-the-wire body length itself.
161              
162             return $self->http->do_request(%req_args)->then(sub {
163 11     11   2963 my ( $resp ) = @_;
164 11         42 my $ct = $resp->header('Content-Type');
165 11 50 33     459 my $charset = ( $ct && $ct =~ /charset=([^\s;]+)/ ) ? lc $1 : undef;
166 11         51 my $body = eval { $resp->decoded_content };
  11         32  
167 11 50       4672 $body = $resp->content if !defined $body;
168 11 50 66     38 if ( defined $args{max_bytes} && defined $body && length($body) > $args{max_bytes} ) {
      66        
169 1         3 $body = substr($body, 0, $args{max_bytes});
170             }
171             $result->fetched({
172 11 100       69 ok => $resp->is_success ? 1 : 0,
    50          
    100          
173             status => $resp->code,
174             status_line => $resp->status_line,
175             final_url => ( $resp->request ? $resp->request->uri.'' : $url ),
176             content_type => $ct,
177             charset => $charset,
178             body => $body,
179             error => $resp->is_success ? undef : $resp->status_line,
180             });
181 11         40 Future->done;
182             })->else(sub {
183 1     1   178 my ( $err ) = @_;
184 1         8 $result->fetched({
185             ok => 0,
186             status => undef,
187             status_line => undef,
188             final_url => $url,
189             content_type => undef,
190             charset => undef,
191             body => undef,
192             error => "$err",
193             });
194 1         2 Future->done;
195 12         26 });
196             }
197              
198             sub _fetch_results {
199 6     6   20 my ( $self, $results_ref, %args ) = @_;
200 6 50       66 my $n = $args{fetch} or return Future->done($results_ref);
201 6 50 33     21 return Future->done($results_ref) unless $results_ref && @$results_ref;
202 6 100       14 my $cap = $n < @$results_ref ? $n : scalar @$results_ref;
203 6         42 my @targets = @{$results_ref}[ 0 .. $cap - 1 ];
  6         16  
204 6   33     29 my $conc = $args{fetch_concurrency} // $self->{fetch_concurrency} // 100;
      50        
205             my %fargs = (
206             timeout => $args{fetch_timeout} // $self->{fetch_timeout},
207             max_bytes => $args{fetch_max_bytes} // $self->{fetch_max_bytes},
208             user_agent => $args{fetch_user_agent} // $self->{fetch_user_agent},
209             accept => $args{fetch_accept},
210 6   33     59 );
      66        
      66        
211 6         9 my $on_fetch = $args{on_fetch};
212             return fmap_void(
213             sub {
214 10     10   531 my $r = shift;
215             $self->_fetch_one($r, %fargs)->on_done(sub {
216 10 50       839 $on_fetch->($r) if $on_fetch;
217 10         46 });
218             },
219             foreach => \@targets,
220             concurrent => $conc,
221 6     6   40 )->then(sub { Future->done($results_ref) });
  6         941  
222             }
223              
224             #----------------------------------------------------------------------
225             # Core dispatch
226             #----------------------------------------------------------------------
227              
228             sub _dispatch {
229 30     30   71 my ( $self, %args ) = @_;
230 30         40 my $query = $args{query};
231 30 50 33     97 croak "search requires 'query'" unless defined $query && length $query;
232              
233 30         103 my @providers = $self->_select_providers( only => $args{only}, exclude => $args{exclude} );
234 30 50       96 return ( [], [] ) unless @providers;
235              
236             my %base = (
237             limit => $args{per_provider_limit} // $self->per_provider_limit,
238             language => $args{language},
239             region => $args{region},
240             safesearch => $args{safesearch},
241 30   33     89 );
242              
243 30   100     113 my $popts = $args{provider_opts} || {};
244 30         40 my @futures;
245 30         38 for my $p (@providers) {
246 41         340 my %merged = %base;
247             # Apply provider_opts in insertion order: class-leaf / tag first,
248             # exact name last so it wins.
249 41         118 for my $sel ( sort { ($a eq $p->name) <=> ($b eq $p->name) } keys %$popts ) {
  3         7  
250 6 100       13 next unless $p->matches($sel);
251 4         8 %merged = ( %merged, %{ $popts->{$sel} } );
  4         17  
252             }
253 41         95 push @futures, {
254             provider => $p,
255             future => $p->search( $self->http, $query, \%merged ),
256             };
257             }
258 30         1035 return ( \@providers, \@futures );
259             }
260              
261             #----------------------------------------------------------------------
262             # Mode: collect
263             #----------------------------------------------------------------------
264              
265             sub search {
266 29     29 1 23565 my ( $self, %args ) = @_;
267 29   100     116 $args{mode} ||= 'collect';
268 29 100       59 return $self->search_stream(%args) if $args{mode} eq 'stream';
269 28 100       64 return $self->search_race(%args) if $args{mode} eq 'race';
270              
271 26         63 my ( $provs, $futs ) = $self->_dispatch(%args);
272 26 50 33     103 return Future->done({ results => [], errors => [], stats => { providers => 0 } })
273             unless $futs && @$futs;
274              
275 26   66     83 my $limit = $args{limit} // $self->default_limit;
276 26         52 my $k = $self->rrf_k;
277              
278             my @wrapped = map {
279 26         48 my $name = $_->{provider}->name;
  35         375  
280             $_->{future}->else(sub {
281 3     3   98 my @err = @_;
282 3         13 Future->done({ __error => 1, provider => $name, error => $err[0] });
283 35         180 });
284             } @$futs;
285              
286             return Future->needs_all(@wrapped)->then(sub {
287 26     26   3963 my @per_provider = @_;
288 26         42 my @errors;
289             my %agg; # normalized_url => { result => $first, score => $n, providers => {p=>rank,...} }
290 26         40 for my $payload (@per_provider) {
291 35 50 66     121 if ( ref $payload eq 'HASH' && $payload->{__error} ) {
292 3         24 push @errors, { provider => $payload->{provider}, error => $payload->{error} };
293 3         8 next;
294             }
295 32 50       51 for my $r ( @{ $payload || [] } ) {
  32         72  
296 70         132 my $key = $self->_normalize_url( $r->url );
297 70 50       130 next unless length $key;
298 70   100     357 my $slot = $agg{$key} ||= { result => $r, score => 0, providers => {} };
299 70         126 $slot->{providers}{ $r->provider } = $r->rank;
300 70         118 $slot->{score} += 1 / ( $k + $r->rank );
301             # Prefer a result that has a snippet, if the first had none.
302 70 50 66     122 if ( !$slot->{result}->snippet && $r->snippet ) {
303 0         0 $slot->{result} = $r;
304             }
305             }
306             }
307             my @merged =
308             map {
309 63         66 my $s = $_;
310 63         125 $s->{result}->score( $s->{score} );
311 63         61 $s->{result}->extra->{providers} = { %{ $s->{providers} } };
  63         157  
312 63         119 $s->{result};
313             }
314 26         90 sort { $b->{score} <=> $a->{score} }
  67         113  
315             values %agg;
316              
317 26 100       57 @merged = splice @merged, 0, $limit if @merged > $limit;
318              
319 26         143 my $final = {
320             results => \@merged,
321             errors => \@errors,
322             stats => {
323             providers => scalar @$provs,
324             providers_ok => ( scalar @$provs ) - scalar @errors,
325             providers_error => scalar @errors,
326             merged => scalar @merged,
327             },
328             };
329              
330 26 100       55 if ( $args{fetch} ) {
331             return $self->_fetch_results(\@merged, %args)->then(sub {
332 5         281 $final->{stats}{fetched} = scalar grep { $_->fetched } @merged;
  15         31  
333 5         9 Future->done($final);
334 5         17 });
335             }
336 21         76 return Future->done($final);
337 26         936 });
338             }
339              
340             #----------------------------------------------------------------------
341             # Mode: stream — fire on_result per result as soon as each provider arrives
342             #----------------------------------------------------------------------
343              
344             sub search_stream {
345 2     2 1 1961 my ( $self, %args ) = @_;
346 2 50       7 my $cb = $args{on_result} or croak "stream mode requires 'on_result' coderef";
347 2         4 my $on_provider_done = $args{on_provider_done};
348 2         3 my $on_provider_error = $args{on_provider_error};
349 2         3 my $on_fetch = $args{on_fetch};
350              
351 2         6 my ( $provs, $futs ) = $self->_dispatch(%args);
352 2 50 33     12 return Future->done({ results => [], errors => [], stats => { providers => 0 } })
353             unless $futs && @$futs;
354              
355 2         3 my @all;
356             my @errors;
357 2         3 my $seen_key = {};
358              
359 2   100     9 my $fetch_cap = $args{fetch} // 0;
360 2         2 my $fetch_started = 0;
361 2         4 my @fetch_futures;
362             my %fargs = (
363             timeout => $args{fetch_timeout} // $self->{fetch_timeout},
364             max_bytes => $args{fetch_max_bytes} // $self->{fetch_max_bytes},
365             user_agent => $args{fetch_user_agent} // $self->{fetch_user_agent},
366             accept => $args{fetch_accept},
367 2   33     23 );
      33        
      33        
368              
369 2         4 my @wrapped;
370 2         4 for my $entry (@$futs) {
371 3         9 my $name = $entry->{provider}->name;
372             my $f = $entry->{future}->then(sub {
373 3     3   122 my ( $results ) = @_;
374 3         7 for my $r (@$results) {
375 9         43 my $key = $self->_normalize_url( $r->url );
376 9 50       17 next unless length $key;
377 9 100       25 next if $seen_key->{$key}++;
378 7         8 push @all, $r;
379 7         20 $cb->($r);
380 7 100 100     23 if ( $fetch_cap && $fetch_started < $fetch_cap ) {
381 2         3 $fetch_started++;
382             push @fetch_futures, $self->_fetch_one($r, %fargs)->on_done(sub {
383 2 50       132 $on_fetch->($r) if $on_fetch;
384 2         7 });
385             }
386             }
387 3 50       9 $on_provider_done->( $name, $results ) if $on_provider_done;
388 3         10 Future->done;
389             })->else(sub {
390 0     0   0 my ( $err ) = @_;
391 0         0 push @errors, { provider => $name, error => $err };
392 0 0       0 $on_provider_error->( $name, $err ) if $on_provider_error;
393 0         0 Future->done;
394 3         18 });
395 3         191 push @wrapped, $f;
396             }
397              
398             return Future->needs_all(@wrapped)->then(sub {
399             # Wait for any pending fetches the stream kicked off.
400             return @fetch_futures
401 2 100   2   256 ? Future->needs_all(@fetch_futures)
402             : Future->done;
403             })->then(sub {
404             Future->done({
405             results => \@all,
406             errors => \@errors,
407             stats => {
408             providers => scalar @$provs,
409             providers_ok => ( scalar @$provs ) - scalar @errors,
410             providers_error => scalar @errors,
411             emitted => scalar @all,
412 2 100   2   158 ( $fetch_cap ? ( fetched => scalar grep { $_->fetched } @all ) : () ),
  3         5  
413             },
414             });
415 2         7 });
416             }
417              
418             #----------------------------------------------------------------------
419             # Mode: race — resolve with whichever provider comes back first (non-error)
420             #----------------------------------------------------------------------
421              
422             sub search_race {
423 2     2 1 8 my ( $self, %args ) = @_;
424 2         6 my ( $provs, $futs ) = $self->_dispatch(%args);
425 2 50 33     12 return Future->done({ results => [], errors => [], stats => { providers => 0 } })
426             unless $futs && @$futs;
427              
428 2   33     9 my $limit = $args{limit} // $self->default_limit;
429              
430             # Wrap each so we can distinguish "first success" from "first completion".
431 2         5 my $winner = Future->new;
432 2         10 my @errors;
433             my @remaining = map {
434 2         4 my $name = $_->{provider}->name;
  3         8  
435 3         6 my $f = $_->{future};
436             $f->on_done(sub {
437 3     3   43 my ( $results ) = @_;
438 3 100       6 return if $winner->is_ready;
439 2         10 my @top = @$results;
440 2 50       5 @top = splice @top, 0, $limit if @top > $limit;
441 2         13 $winner->done({
442             provider => $name,
443             results => \@top,
444             errors => \@errors,
445             stats => {
446             providers => scalar @$provs,
447             winning => $name,
448             },
449             });
450 3         25 });
451             $f->on_fail(sub {
452 0     0   0 my ( $err ) = @_;
453 0         0 push @errors, { provider => $name, error => $err };
454 3         77 });
455 3         36 $f;
456             } @$futs;
457              
458             # If everyone fails, resolve with errors.
459             Future->wait_all(@remaining)->on_done(sub {
460 2 50   2   153 return if $winner->is_ready;
461 0         0 $winner->done({
462             provider => undef,
463             results => [],
464             errors => \@errors,
465             stats => {
466             providers => scalar @$provs,
467             providers_error => scalar @errors,
468             },
469             });
470 2         13 });
471              
472 2 100       63 return $winner unless $args{fetch};
473             return $winner->then(sub {
474 1     1   44 my ( $out ) = @_;
475 1 50       2 return Future->done($out) unless @{ $out->{results} };
  1         2  
476             $self->_fetch_results( $out->{results}, %args )->then(sub {
477 1         50 $out->{stats}{fetched} = scalar grep { $_->fetched } @{ $out->{results} };
  3         7  
  1         3  
478 1         3 Future->done($out);
479 1         5 });
480 1         7 });
481             }
482              
483             1;
484              
485             __END__