File Coverage

blib/lib/App/cdnget/Worker.pm
Criterion Covered Total %
statement 33 35 94.2
branch n/a
condition n/a
subroutine 12 12 100.0
pod n/a
total 45 47 95.7


line stmt bran cond sub pod time code
1             package App::cdnget::Worker;
2 1     1   3 use Object::Base;
  1         1  
  1         4  
3 1     1   448 use v5.14;
  1         2  
4 1     1   3 use bytes;
  1         1  
  1         4  
5 1     1   77 use IO::Handle;
  1         1  
  1         26  
6 1     1   512 use FileHandle;
  1         2718  
  1         4  
7 1     1   432 use Time::HiRes qw(sleep usleep);
  1         2  
  1         8  
8 1     1   673 use Thread::Semaphore;
  1         743  
  1         26  
9 1     1   505 use FCGI;
  1         765  
  1         22  
10 1     1   5 use Digest::MD5;
  1         1  
  1         15  
11              
12 1     1   63 use App::cdnget;
  1         1  
  1         17  
13 1     1   3 use App::cdnget::Exception;
  1         1  
  1         14  
14 1     1   418 use App::cdnget::Downloader;
  0            
  0            
15              
16              
17             BEGIN
18             {
19             our $VERSION = '0.05';
20             }
21              
22              
23             my $maxCount;
24             my $spareCount;
25             my $addr = 0;
26             my $cachePath;
27              
28             my $terminating :shared = 0;
29             my $terminated :shared = 0;
30             my $workerSemaphore :shared;
31             my $spareSemaphore :shared;
32             my $accepterSemaphore :shared;
33             my $accepterCount :shared = 0;
34             my $socket = 0;
35              
36              
37             attributes qw(:shared tid);
38              
39              
40             sub init
41             {
42             my ($_spareCount, $_maxCount, $_cachePath, $_addr) = @_;
43             $spareCount = $_spareCount;
44             $maxCount = $_maxCount;
45             $cachePath = $_cachePath;
46             $cachePath = substr($cachePath, 0, length($cachePath)-1) while $cachePath and substr($cachePath, -1) eq "/";
47             $addr = $_addr;
48             $workerSemaphore = Thread::Semaphore->new($maxCount) or App::cdnget::Exception->throw($!);
49             $spareSemaphore = Thread::Semaphore->new($spareCount) or App::cdnget::Exception->throw($!);
50             $accepterSemaphore = Thread::Semaphore->new($spareCount) or App::cdnget::Exception->throw($!);
51             $socket = FCGI::OpenSocket($addr, $maxCount) or App::cdnget::Exception->throw($!) if $addr;
52             return 1;
53             }
54              
55             sub final
56             {
57             FCGI::CloseSocket($socket) if $socket;
58             $socket = 0;
59             return 1;
60             }
61              
62             sub terminate
63             {
64             do
65             {
66             lock($terminating);
67             return 0 if $terminating;
68             $terminating = 1;
69             };
70             App::cdnget::log_info("Workers terminating...");
71             my $gracefully = 0;
72             while (not $gracefully and not $App::cdnget::terminating_force)
73             {
74             $gracefully = $workerSemaphore->down_timed(3, $maxCount);
75             }
76             lock($terminated);
77             $terminated = 1;
78             App::cdnget::log_info("Workers terminated".($gracefully? " gracefully": "").".");
79             return 1;
80             }
81              
82             sub terminating
83             {
84             lock($terminating);
85             return $terminating;
86             }
87              
88             sub terminated
89             {
90             if (@_ > 0)
91             {
92             my $self = shift;
93             lock($self);
94             return defined($self->tid)? 0: 1;
95             }
96             lock($terminated);
97             return $terminated;
98             }
99              
100             sub new
101             {
102             my $class = shift;
103             while (not $spareSemaphore->down_timed(1))
104             {
105             if (terminating())
106             {
107             return;
108             }
109             }
110             while (not $workerSemaphore->down_timed(1))
111             {
112             if (terminating())
113             {
114             $spareSemaphore->up();
115             return;
116             }
117             }
118             if (terminating())
119             {
120             $spareSemaphore->up();
121             $workerSemaphore->up();
122             return;
123             }
124             my $self = $class->SUPER();
125             $self->tid = undef;
126             do
127             {
128             lock($self);
129             my $thr = threads->create(\&run, $self) or $self->throw($!);
130             cond_wait($self);
131             unless (defined($self->tid))
132             {
133             App::cdnget::Exception->throw($thr->join());
134             }
135             $thr->detach();
136             };
137             return $self;
138             }
139              
140             sub DESTROY
141             {
142             my $self = shift;
143             $self->SUPER::DESTROY;
144             }
145              
146             sub throw
147             {
148             my $self = shift;
149             my ($msg) = @_;
150             unless (ref($msg))
151             {
152             $msg = "Unknown" unless $msg;
153             $msg = "Worker ".
154             $msg;
155             }
156             App::cdnget::Exception->throw($msg, 1);
157             }
158              
159             sub worker
160             {
161             my $self = shift;
162             my ($req) = @_;
163             my ($in, $out, $err) = $req->GetHandles();
164             my $env = $req->GetEnvironment();
165              
166             my $id = $env->{CDNGET_ID};
167             $self->throw("Invalid ID") unless defined($id);
168             $id = ($id =~ /^(.*)/)[0];
169             $id =~ s/^\s+|\s+$//g;
170             $self->throw("Invalid ID") unless $id =~ /^\w+$/i;
171              
172             my $origin = $env->{CDNGET_ORIGIN};
173             $self->throw("Invalid origin") unless defined($origin);
174             $origin = ($origin =~ /^(.*)/)[0];
175             $origin =~ s/^\s+|\s+$//g;
176             $origin = URI->new($origin);
177             $self->throw("Invalid origin scheme") unless $origin->scheme =~ /^http|https$/i;
178             $origin->path(substr($origin->path, 0, length($origin->path)-1)) while $origin->path and substr($origin->path, -1) eq "/";
179              
180             my $uri = $env->{CDNGET_URI};
181             $self->throw("Invalid URI") unless defined($uri);
182             $uri = ($uri =~ /^(.*)/)[0];
183             $uri =~ s/^\s+|\s+$//g;
184             $uri = "/$uri" unless $uri and substr($uri, 0, 1) eq "/";
185              
186             my $hook = $env->{CDNGET_HOOK};
187             $hook = "" unless defined($hook);
188             $hook = ($hook =~ /^(.*)/)[0];
189             $hook =~ s/^\s+|\s+$//g;
190              
191             my $url = $origin->scheme."://".$origin->host_port.$origin->path.$uri;
192             my $digest = Digest::MD5::md5_hex("$url $hook");
193             my $uid = "$id/$digest";
194             my $path = "$cachePath/$id";
195             mkdir($path);
196             my @dirs = $digest =~ /(..)(.)$/;
197             my $file = $digest;
198             for (reverse @dirs)
199             {
200             $path .= "/$_";
201             mkdir($path);
202             }
203             $self->throw("Cache directory not exists") unless -d $path;
204             $path .= "/$file";
205              
206             my $fh;
207             my $downloader;
208             do
209             {
210             lock(%App::cdnget::Downloader::uids);
211             $fh = FileHandle->new($path, "<");
212             unless ($fh)
213             {
214             return unless App::cdnget::Downloader->new($uid, $path, $url, $hook);
215             $fh = FileHandle->new($path, "<") or $self->throw($!);
216             }
217             $downloader = $App::cdnget::Downloader::uids{$uid};
218             };
219             $fh->binmode(":bytes") or $self->throw($!);
220              
221             do
222             {
223             local ($/, $\) = ("\r\n")x2;
224             my $line;
225             my $buf;
226             my $empty = 1;
227             while (not $self->terminating)
228             {
229             threads->yield();
230             my $downloaderTerminated = ! $downloader || $downloader->terminated;
231             $line = $fh->getline;
232             unless (defined($line))
233             {
234             $self->throw($!) if $fh->error;
235             return if $downloaderTerminated;
236             my $pos = $fh->tell;
237             usleep(1*1000);
238             $fh->seek($pos, 0) or $self->throw($!);
239             next;
240             }
241             chomp $line;
242             unless ($line =~ /^(Client\-)/i)
243             {
244             if (not $out->print("$line\r\n"))
245             {
246             not $! or $!{EPIPE} or $!{ECONNRESET} or $!{EPROTOTYPE} or $self->throw($!);
247             return;
248             }
249             $empty = 0;
250             }
251             last unless $line;
252             }
253             while (not $self->terminating)
254             {
255             threads->yield();
256             my $downloaderTerminated = ! $downloader || $downloader->terminated;
257             my $len = $fh->read($buf, $App::cdnget::CHUNK_SIZE);
258             $self->throw($!) unless defined($len);
259             if ($len == 0)
260             {
261             return if $downloaderTerminated;
262             my $pos = $fh->tell;
263             usleep(1*1000);
264             $fh->seek($pos, 0) or $self->throw($!);
265             next;
266             }
267             if (not $out->write($buf, $len))
268             {
269             not $! or $!{EPIPE} or $!{ECONNRESET} or $!{EPROTOTYPE} or $self->throw($!);
270             return;
271             }
272             $empty = 0;
273             }
274             if ($empty)
275             {
276             if (not $out->print("Status: 404\r\n"))
277             {
278             not $! or $!{EPIPE} or $!{ECONNRESET} or $!{EPROTOTYPE} or $self->throw($!);
279             return;
280             }
281             }
282             };
283             return;
284             }
285              
286             sub run
287             {
288             my $self = shift;
289             my $tid = threads->tid();
290              
291             $self->tid = $tid;
292             do
293             {
294             lock($self);
295             cond_signal($self);
296             };
297              
298             my $spare = 1;
299             my $accepting = 0;
300             eval
301             {
302             my ($in, $out, $err) = (IO::Handle->new(), IO::Handle->new(), IO::Handle->new());
303             my $env = {};
304             my $req = FCGI::Request($in, $out, $err, $env, $socket, FCGI::FAIL_ACCEPT_ON_INTR) or $self->throw($!);
305              
306             wait_accept:
307             while (not $self->terminating)
308             {
309             $accepterSemaphore->down_timed(1);
310             do
311             {
312             lock($accepterCount);
313             last wait_accept unless $accepterCount >= $spareCount;
314             };
315             }
316             $spareSemaphore->up();
317             $spare = 0;
318              
319             accepter_loop:
320             while (not $self->terminating)
321             {
322             threads->yield();
323             $workerSemaphore->up();
324             $accepting = 1;
325             my $accept;
326             do
327             {
328             lock($accepterCount);
329             $accepterCount++;
330             };
331             eval { $accept = $req->Accept() };
332             do
333             {
334             lock($accepterCount);
335             $accepterCount--;
336             };
337             $accepterSemaphore->up();
338             last unless $accept >= 0;
339             if ($self->terminating)
340             {
341             $req->Finish();
342             last;
343             }
344             $workerSemaphore->down();
345             $accepting = 0;
346             eval
347             {
348             $self->worker($req);
349             };
350             do
351             {
352             local $@;
353             $req->Finish();
354             };
355             if ($@)
356             {
357             die $@;
358             }
359             do
360             {
361             lock($accepterCount);
362             last accepter_loop if $accepterCount >= $spareCount;
363             };
364             }
365             };
366             do
367             {
368             local $@;
369             $workerSemaphore->up() unless $accepting;
370             $spareSemaphore->up() if $spare;
371             usleep(10*1000); #cond_wait bug
372             lock($self);
373             $self->tid = undef;
374             };
375             if ($@)
376             {
377             warn $@;
378             }
379             return;
380             }
381              
382              
383             1;
384             __END__
385             =head1 REPOSITORY
386              
387             B<GitHub> L<https://github.com/orkunkaraduman/p5-cdnget>
388              
389             B<CPAN> L<https://metacpan.org/release/App-cdnget>
390              
391             =head1 AUTHOR
392              
393             Orkun Karaduman <orkunkaraduman@gmail.com>
394              
395             =head1 COPYRIGHT AND LICENSE
396              
397             Copyright (C) 2017 Orkun Karaduman <orkunkaraduman@gmail.com>
398              
399             This program is free software: you can redistribute it and/or modify
400             it under the terms of the GNU General Public License as published by
401             the Free Software Foundation, either version 3 of the License, or
402             (at your option) any later version.
403              
404             This program is distributed in the hope that it will be useful,
405             but WITHOUT ANY WARRANTY; without even the implied warranty of
406             MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
407             GNU General Public License for more details.
408              
409             You should have received a copy of the GNU General Public License
410             along with this program. If not, see <http://www.gnu.org/licenses/>.
411              
412             =cut