File Coverage

blib/lib/AnyEvent/Net/Curl/Queued.pm
Criterion Covered Total %
statement 22 24 91.6
branch n/a
condition n/a
subroutine 8 8 100.0
pod n/a
total 30 32 93.7


line stmt bran cond sub pod time code
1             package AnyEvent::Net::Curl::Queued;
2             # ABSTRACT: Moo wrapper for queued downloads via Net::Curl & AnyEvent
3              
4              
5 14     14   199223 use strict;
  14         33  
  14         6675  
6 14     14   75 use utf8;
  14         26  
  14         103  
7 14     14   371 use warnings qw(all);
  14         27  
  14         568  
8              
9 14     14   29089 use AnyEvent;
  14         102318  
  14         512  
10 14     14   135 use Carp qw(confess);
  14         25  
  14         1031  
11 14     14   16006 use Moo;
  14         251555  
  14         86  
12 14         2160 use MooX::Types::MooseLike::Base qw(
13             AnyOf
14             ArrayRef
15             Bool
16             HashRef
17             InstanceOf
18             Int
19             Num
20             Object
21             Str
22             is_Int
23 14     14   48275 );
  14         108601  
24 14     14   15310 use Net::Curl::Share;
  0            
  0            
25              
26             use AnyEvent::Net::Curl::Queued::Multi;
27              
28             our $VERSION = '0.047'; # VERSION
29              
30              
31             has allow_dups => (is => 'ro', isa => Bool, default => sub { 0 });
32              
33              
34             has common_opts => (is => 'ro', isa => HashRef, default => sub { {} });
35              
36              
37             has http_response => (is => 'ro', isa => Bool, default => sub { 0 });
38              
39              
40             has completed => (
41             is => 'ro',
42             isa => Int,
43             default => sub { 0 },
44             writer => 'set_completed',
45             );
46              
47             sub inc_completed {
48             my ($self) = @_;
49             return $self->set_completed($self->completed + 1);
50             }
51              
52              
53             has cv => (is => 'ro', isa => Object, default => sub { AE::cv }, lazy => 1, writer => 'set_cv');
54              
55              
56             has max => (
57             is => 'rw',
58             isa => Int,
59             coerce => sub {
60             confess 'At least 1 connection required'
61             if not is_Int($_[0])
62             or $_[0] < 1;
63             return $_[0];
64             },
65             default => sub { 4 },
66             );
67              
68              
69             has multi => (is => 'ro', isa => InstanceOf['AnyEvent::Net::Curl::Queued::Multi'], writer => 'set_multi');
70              
71              
72             has queue => (
73             is => 'ro',
74             isa => ArrayRef[Object],
75             default => sub { [] },
76             );
77              
78             ## no critic (RequireArgUnpacking)
79              
80             sub queue_push { return 0 + push @{shift->queue}, @_ }
81             sub queue_unshift { return 0 + unshift @{shift->queue}, @_ }
82             sub dequeue { return shift @{shift->queue} }
83             sub count { return 0 + @{shift->queue} }
84              
85              
86             has share => (
87             is => 'ro',
88             isa => InstanceOf['Net::Curl::Share'],
89             default => sub { Net::Curl::Share->new({ stamp => time }) },
90             lazy => 1,
91             );
92              
93              
94             has stats => (is => 'ro', isa => InstanceOf['AnyEvent::Net::Curl::Queued::Stats'], default => sub { AnyEvent::Net::Curl::Queued::Stats->new }, lazy => 1);
95              
96              
97             has timeout => (is => 'ro', isa => Num, default => sub { 60.0 });
98              
99              
100             has unique => (is => 'ro', isa => HashRef[Str], default => sub { {} });
101              
102              
103             has watchdog => (is => 'ro', isa => AnyOf[ArrayRef, Object], writer => 'set_watchdog', clearer => 'clear_watchdog', predicate => 'has_watchdog', weak_ref => 0);
104              
105              
106             sub BUILD {
107             my ($self) = @_;
108              
109             $self->set_multi(
110             AnyEvent::Net::Curl::Queued::Multi->new({
111             max => $self->max,
112             timeout => $self->timeout,
113             })
114             );
115              
116             $self->share->setopt(Net::Curl::Share::CURLSHOPT_SHARE, Net::Curl::Share::CURL_LOCK_DATA_COOKIE); # 2
117             $self->share->setopt(Net::Curl::Share::CURLSHOPT_SHARE, Net::Curl::Share::CURL_LOCK_DATA_DNS); # 3
118              
119             ## no critic (RequireCheckingReturnValueOfEval)
120             eval { $self->share->setopt(Net::Curl::Share::CURLSHOPT_SHARE, Net::Curl::Share::CURL_LOCK_DATA_SSL_SESSION) };
121              
122             return;
123             }
124              
125             sub BUILDARGS {
126             my $class = shift;
127             if (@_ == 1 and q(HASH) eq ref $_[0]) {
128             return shift;
129             } elsif (@_ % 2 == 0) {
130             return { @_ };
131             } elsif (@_ == 1) {
132             return { max => shift };
133             } else {
134             confess 'Should be initialized as ' . $class . '->new(Hash|HashRef|Int)';
135             }
136             }
137              
138              
139             sub start {
140             my ($self) = @_;
141              
142             # watchdog
143             $self->set_watchdog(AE::timer 1, 1, sub {
144             $self->multi->perform;
145             $self->empty;
146             });
147              
148             # populate queue
149             $self->add($self->dequeue)
150             while
151             $self->count
152             and ($self->multi->handles < $self->max);
153              
154             # check if queue is empty
155             $self->empty;
156              
157             return;
158             }
159              
160              
161             sub empty {
162             my ($self) = @_;
163              
164             AE::postpone { $self->cv->send }
165             if
166             $self->completed > 0
167             and $self->count == 0
168             and $self->multi->handles == 0;
169              
170             return;
171             }
172              
173              
174              
175             sub add {
176             my ($self, $worker) = @_;
177              
178             # vivify the worker
179             $worker = $worker->()
180             if ref($worker) eq 'CODE';
181              
182             # self-reference & warmup
183             $worker->queue($self);
184             $worker->init;
185              
186             # check if already processed
187             if ($self->allow_dups
188             or $worker->force
189             or ++$self->unique->{$worker->unique} == 1
190             ) {
191             # fire
192             $self->multi->add_handle($worker);
193             }
194              
195             return;
196             }
197              
198              
199             sub append {
200             my ($self, $worker) = @_;
201              
202             $self->queue_push($worker);
203             $self->start;
204              
205             return;
206             }
207              
208              
209             sub prepend {
210             my ($self, $worker) = @_;
211              
212             $self->queue_unshift($worker);
213             $self->start;
214              
215             return;
216             }
217              
218              
219             ## no critic (ProhibitBuiltinHomonyms)
220             sub wait {
221             my ($self) = @_;
222              
223             # handle queue
224             $self->cv->recv;
225              
226             # stop the watchdog
227             $self->clear_watchdog;
228              
229             # reload
230             $self->set_cv(AE::cv);
231              
232             return;
233             }
234              
235              
236             1;
237              
238             __END__