File Coverage

blib/lib/HTTP/LoadGen.pm
Criterion Covered Total %
statement 10 12 83.3
branch n/a
condition n/a
subroutine 4 4 100.0
pod n/a
total 14 16 87.5


line stmt bran cond sub pod time code
1             package HTTP::LoadGen;
2              
3 3     3   48894 use 5.008008;
  3         20  
  3         506  
4 3     3   22 use strict;
  3         7  
  3         161  
5 3     3   20 no warnings qw/uninitialized/;
  3         5  
  3         193  
6              
7             sub tlscache ();
8             sub conncache ();
9              
10 3     3   2791 use HTTP::LoadGen::Run;
  0            
  0            
11             BEGIN{ HTTP::LoadGen::Run::_dbg->import }
12              
13             use Coro;
14             use Coro::Semaphore ();
15             use Coro::Specific ();
16             use Coro::Timer ();
17             use Coro::Handle;
18             use AnyEvent;
19             use AnyEvent::TLS;
20             use Exporter ();
21             use Scalar::Util ();
22              
23             {our $VERSION = '0.07';}
24              
25             BEGIN {
26             our %EXPORT_TAGS=
27             (
28             common=>[qw!loadgen threadnr done userdata options rng rnd delay
29             register_iterator get_iterator follow_3XX!],
30             const=>\@HTTP::LoadGen::Run::EXPORT,
31             );
32             my %seen;
33             foreach my $v (values %EXPORT_TAGS) {
34             undef @seen{@$v} if @$v;
35             }
36             our @EXPORT_OK=@{$EXPORT_TAGS{all}=[keys %seen]};
37             }
38              
39             use constant {
40             TD_USER=>0,
41             TD_RNG=>1,
42             TD_THREADNR=>2,
43             TD_DONE=>3,
44             TD_CONN_CACHE=>4,
45             TD_TLS_CACHE=>5,
46             };
47              
48             my $td; # thread specific data
49             our $o; # the global config hash
50              
51             sub rnd; # predeclaration
52             sub import {
53             my $name=shift;
54             local *export=\&Exporter::export;
55             Exporter::export_to_level $name, 1, $name, map {
56             my $what=$_; local $_;
57             if($what eq '-rand') {
58             *CORE::GLOBAL::rand=\&rnd;
59             ();
60             } elsif($what eq ':all') {
61             our %EXPORT_TAGS;
62             unless( exists $EXPORT_TAGS{sb} ) {
63             require HTTP::LoadGen::ScoreBoard;
64             require HTTP::LoadGen::Logger;
65             HTTP::LoadGen::ScoreBoard->import
66             (@HTTP::LoadGen::ScoreBoard::EXPORT_OK);
67             *get_logger=\&HTTP::LoadGen::Logger::get;
68             $EXPORT_TAGS{sb}=\@HTTP::LoadGen::ScoreBoard::EXPORT_OK;
69             $EXPORT_TAGS{log}=[qw!get_logger!];
70             my %seen;
71             foreach my $v (values %EXPORT_TAGS) {
72             undef @seen{@$v} if @$v;
73             }
74             our @EXPORT_OK=@{$EXPORT_TAGS{all}=[keys %seen]};
75             }
76             $what;
77             } else {
78             $what;
79             }
80             } @_;
81             }
82              
83             sub create_proc {
84             my ($how_many, $init, $handler, $exit)=@_;
85              
86             AnyEvent::detect;
87              
88             my @watcher;
89             my %status;
90             my $sem=Coro::Semaphore->new;
91              
92             pipe my($r, $w);
93             pipe my($r2, $w2);
94              
95             for( my $i=0; $i<$how_many; $i++ ) {
96             my $pid;
97             select undef, undef, undef, 0.1 until defined ($pid=fork);
98             unless($pid) {
99             close $r;
100             close $w2;
101             $r2=unblock $r2;
102             $init->($i) if $init;
103             close $w; # signal parent
104             $r2->readable; # wait for start signal
105             undef $r2;
106             my $rc=$handler->($i);
107              
108             exit $exit->($i, $rc) if $exit;
109             exit $rc;
110             }
111             push @watcher, AE::child $pid, sub {
112             $status{$_[0]}=[($_[1]>>8)&0xff, $_[1]&0x7f, $_[1]&0x80];
113             $sem->up;
114             };
115             $sem->adjust(-1);
116             }
117              
118             close $w;
119             unblock($r)->readable; # wait for children to finish ChildInit
120              
121             return [$w2, $sem, \@watcher, \%status];
122             }
123              
124             sub start_proc {
125             my ($arr)=@_;
126             close $arr->[0];
127             $arr->[1]->down;
128             return $arr->[3];
129             }
130              
131             sub _start_thr {
132             my ($threadnr, $sem, $handler)=@_;
133             $sem->adjust(-1);
134             async {
135             $handler->(@_);
136             $sem->up;
137             } $threadnr;
138             }
139              
140             sub ramp_up {
141             my ($procnr, $nproc, $start, $max, $duration, $handler)=@_;
142              
143             $duration=300 if $duration<=0;
144              
145             # begin with $start (system total) threads and start over a period
146             # of $duration seconds up to $max threads.
147              
148             my $sem=Coro::Semaphore->new(1);
149             my $initial_sleep=($nproc + $procnr - $start % $nproc) % $nproc + 1;
150              
151             my $i=$procnr;
152             for(; $i<$start; $i+=$nproc ) {
153             _start_thr $i, $sem, $handler;
154             }
155              
156             return $sem if $i>=$max;
157              
158             my $interval=$duration/($max-$start);
159             $initial_sleep*=$interval;
160             $interval*=$nproc;
161              
162             my $cb=Coro::rouse_cb;
163              
164             my $tm;
165             $tm=AE::timer $initial_sleep, $interval, sub {
166             _start_thr $i, $sem, $handler;
167             $i+=$nproc;
168             unless ($i<$max) {
169             undef $tm;
170             $cb->();
171             }
172             };
173             Coro::rouse_wait;
174              
175             return $sem;
176             }
177              
178             sub tlscache () {$$td->[TD_TLS_CACHE]}
179             sub conncache () {$$td->[TD_CONN_CACHE]}
180             sub threadnr () {$$td->[TD_THREADNR]}
181             sub done () : lvalue {$$td->[TD_DONE]}
182             sub userdata () : lvalue {$$td->[TD_USER]}
183             sub options () {$o}
184             sub rng () : lvalue {$$td->[TD_RNG]}
185              
186             sub rnd (;$) {
187             my $rng=rng;
188             (ref $rng eq 'CODE' ? $rng->($_[0]||1) :
189             ref $rng ? $rng->rand($_[0]||1) :
190             CORE::rand $_[0]);
191             }
192              
193             sub delay {
194             my ($prefix, $param)=@_;
195             return if delete $param->{'skip'.$prefix.'delay'};
196             return unless exists $param->{$prefix.'delay'};
197             my $sec=$param->{$prefix.'delay'};
198             if( exists $param->{$prefix.'jitter'} ) {
199             my $jitter=$param->{$prefix.'jitter'};
200             $sec+=-$jitter+rnd(2*$jitter);
201             }
202             #D warn "\u${prefix}Delay: $sec sec\n";
203             Coro::Timer::sleep $sec if $sec>0;
204             }
205              
206             my (%services, %known_iterators);
207              
208             sub register_iterator {
209             my $code=pop;
210             if( Scalar::Util::reftype $code eq 'CODE' ) {
211             @known_iterators{@_}=($code)x(+@_);
212             } else {
213             die "CODE reference expected";
214             }
215             }
216              
217             sub get_iterator {
218             my ($name)=@_;
219             exists $known_iterators{$name} and return $known_iterators{$name};
220             return $known_iterators{''};
221             }
222              
223             {
224             my %keep=('user-agent'=>1, 'referer'=>1);
225             sub follow_3XX {
226             my ($rc, $el)=@_;
227              
228             # we are stricter here than most browsers because we do not follow
229             # partial URLs.
230             if( $rc->[RC_STATUS]=~/^3/ and
231             exists $rc->[RC_HEADERS]->{location} and
232             $rc->[RC_HEADERS]->{location}->[0]=~m!^(https?):// # scheme
233             ([^:/]+) # host
234             (:[0-9]+)? # optional port
235             (.*)!ix ) { # uri
236             # follow location
237             my $scheme=lc($1);
238             my $host=$2;
239             my $port=$3||$services{$scheme};
240             my $uri=$4||'/';
241              
242             my @h;
243             if( exists $el->[RQ_PARAM]->{headers} ) {
244             my $hdr=$el->[RQ_PARAM]->{headers};
245             for (my $i=0; $i<@$hdr; $i+=2) {
246             push @h, $hdr->[$i], $hdr->[$i+1] if exists $keep{lc $hdr->[$i]};
247             }
248             }
249              
250             return ['GET', $scheme, $host, $port, $uri,
251             {keepalive=>KEEPALIVE, followed=>1, headers=>\@h}];
252             }
253             }
254             }
255              
256             BEGIN {
257             %services=(http=>80, https=>443);
258              
259             register_iterator '', default=>sub {
260             my $urls=options->{URLList};
261             my $nurls=@$urls;
262             my $i=0;
263             return sub {
264             return if $i>=$nurls;
265             return $urls->[$i++];
266             };
267             };
268              
269             register_iterator random_start=>sub {
270             my $urls=options->{URLList};
271             my $nurls=@$urls;
272             my ($i, $off)=(0, int rnd $nurls);
273             return sub {
274             return if $i>=$nurls;
275             return $urls->[($off+$i++) % $nurls];
276             };
277             };
278              
279             register_iterator follow=>sub {
280             my %save_delay;
281             my $it=@_ ? $_[0] : get_iterator('')->();
282             return sub {
283             my ($rc, $el)=@_;
284              
285             my $next=follow_3XX $rc, $el;
286             return $next if $next;
287              
288             delay 'post', \%save_delay;
289              
290             # get next request
291             $next=$it->($rc, $el);
292             return unless $next;;
293              
294             # save postdelay
295             if( exists $next->[RQ_PARAM]->{postdelay} ) {
296             $save_delay{postdelay}=$next->[RQ_PARAM]->{postdelay};
297             $save_delay{postjitter}=$next->[RQ_PARAM]->{postjitter}
298             if exists $next->[RQ_PARAM]->{postjitter};
299             $next->[RQ_PARAM]->{skippostdelay}=1;
300             }
301              
302             return $next;
303             };
304             };
305              
306             register_iterator random_start_follow=>sub {
307             @_=get_iterator('random_start')->();
308             goto &{get_iterator 'follow'};
309             };
310             }
311              
312             sub loadgen {
313             local $o=+{@_==1 ? %{$_[0]} : @_};
314              
315             my $nproc=($o->{NWorker}||=1);
316              
317             die "'URLList' or 'InitURLs' invalid"
318             unless (exists $o->{InitURLs} &&
319             Scalar::Util::reftype $o->{InitURLs} eq 'CODE' or
320             exists $o->{URLList} && (!exists $o->{InitURLs} ||
321             exists $known_iterators{$o->{InitURLs}}));
322              
323             my $init=sub {
324             my ($procnr)=@_;
325              
326             $td=Coro::Specific->new(); # thread specific data
327              
328             AnyEvent::TLS::init;
329              
330             HTTP::LoadGen::Run::dnscache=$o->{dnscache} if exists $o->{dnscache};
331             $o->{ProcInit}->($procnr) if exists $o->{ProcInit};
332              
333             $o->{before}=sub {
334             my ($el)=@_;
335             delay 'pre', $el->[5];
336             $o->{ReqStart}->($el) if exists $o->{ReqStart};
337             };
338              
339             $o->{after}=sub {
340             my ($rc, $el, $connh)=@_;
341             $o->{ReqDone}->($rc, $el, $connh) if exists $o->{ReqDone};
342             return 1 if done;
343             delay 'post', $el->[5];
344             return;
345             };
346              
347             if( exists $o->{InitURLs} ) {
348             $o->{InitURLs}=$known_iterators{$o->{InitURLs}} unless ref $o->{InitURLs};
349             } else {
350             $o->{InitURLs}=$known_iterators{''};
351             }
352             };
353             my $exit;
354             $exit=$o->{ProcExit} if exists $o->{ProcExit};
355              
356             $o->{ParentInit}->() if exists $o->{ParentInit};
357              
358             start_proc create_proc $nproc, $init, sub {
359             my ($procnr)=@_;
360              
361             ramp_up($procnr, $nproc, $o->{RampUpStart}||$nproc,
362             $o->{RampUpMax}||$nproc, $o->{RampUpDuration}||300, sub {
363             my ($threadnr)=@_;
364              
365             my $data=[];
366             $$td=$data;
367              
368             $data->[TD_CONN_CACHE]={};
369             $data->[TD_TLS_CACHE]={};
370             $data->[TD_THREADNR]=$threadnr;
371             $data->[TD_USER]=$o->{ThreadInit}->() if exists $o->{ThreadInit};
372              
373             HTTP::LoadGen::Run::run_urllist $o;
374              
375             $o->{ThreadExit}->() if exists $o->{ThreadExit};
376             })->down;
377              
378             return 0;
379             }, $exit;
380              
381             $o->{ParentExit}->() if exists $o->{ParentExit};
382             }
383              
384             1;