File Coverage

blib/lib/App/cdnget/Worker.pm
Criterion Covered Total %
statement 33 35 94.2
branch n/a
condition n/a
subroutine 12 12 100.0
pod n/a
total 45 47 95.7


line stmt bran cond sub pod time code
1             package App::cdnget::Worker;
2 1     1   3 use Object::Base;
  1         1  
  1         5  
3 1     1   384 use v5.14;
  1         2  
4 1     1   3 use bytes;
  1         1  
  1         4  
5 1     1   14 use IO::Handle;
  1         1  
  1         24  
6 1     1   481 use FileHandle;
  1         3031  
  1         3  
7 1     1   502 use Time::HiRes qw(sleep usleep);
  1         1  
  1         8  
8 1     1   715 use Thread::Semaphore;
  1         728  
  1         26  
9 1     1   423 use FCGI;
  1         815  
  1         27  
10 1     1   431 use Digest::SHA;
  1         1875  
  1         20  
11              
12 1     1   71 use App::cdnget;
  1         2  
  1         25  
13 1     1   4 use App::cdnget::Exception;
  1         1  
  1         16  
14 1     1   432 use App::cdnget::Downloader;
  0            
  0            
15              
16              
17             BEGIN
18             {
19             our $VERSION = '0.04';
20             }
21              
22              
23             my $maxCount;
24             my $spareCount;
25             my $addr = 0;
26             my $cachePath;
27              
28             my $terminating :shared = 0;
29             my $terminated :shared = 0;
30             my $workerSemaphore :shared;
31             my $spareSemaphore :shared;
32             my $accepterSemaphore :shared;
33             my $accepterCount :shared = 0;
34             my $socket = 0;
35              
36              
37             attributes qw(:shared tid);
38              
39              
40             sub init
41             {
42             my ($_spareCount, $_maxCount, $_cachePath, $_addr) = @_;
43             $spareCount = $_spareCount;
44             $maxCount = $_maxCount;
45             $cachePath = $_cachePath;
46             $cachePath = substr($cachePath, 0, length($cachePath)-1) while $cachePath and substr($cachePath, -1) eq "/";
47             $addr = $_addr;
48             $workerSemaphore = Thread::Semaphore->new($maxCount) or App::cdnget::Exception->throw($!);
49             $spareSemaphore = Thread::Semaphore->new($spareCount) or App::cdnget::Exception->throw($!);
50             $accepterSemaphore = Thread::Semaphore->new($spareCount) or App::cdnget::Exception->throw($!);
51             $socket = FCGI::OpenSocket($addr, $maxCount) or App::cdnget::Exception->throw($!) if $addr;
52             return 1;
53             }
54              
55             sub final
56             {
57             FCGI::CloseSocket($socket) if $socket;
58             $socket = 0;
59             return 1;
60             }
61              
62             sub terminate
63             {
64             do
65             {
66             lock($terminating);
67             return 0 if $terminating;
68             $terminating = 1;
69             };
70             App::cdnget::log_info("Workers terminating...");
71             my $gracefully = 0;
72             while (not $gracefully and not $App::cdnget::terminating_force)
73             {
74             $gracefully = $workerSemaphore->down_timed(3, $maxCount);
75             }
76             lock($terminated);
77             $terminated = 1;
78             App::cdnget::log_info("Workers terminated".($gracefully? " gracefully": "").".");
79             return 1;
80             }
81              
82             sub terminating
83             {
84             lock($terminating);
85             return $terminating;
86             }
87              
88             sub terminated
89             {
90             if (@_ > 0)
91             {
92             my $self = shift;
93             lock($self);
94             return defined($self->tid)? 0: 1;
95             }
96             lock($terminated);
97             return $terminated;
98             }
99              
100             sub new
101             {
102             my $class = shift;
103             while (not $spareSemaphore->down_timed(1))
104             {
105             if (terminating())
106             {
107             return;
108             }
109             }
110             while (not $workerSemaphore->down_timed(1))
111             {
112             if (terminating())
113             {
114             $spareSemaphore->up();
115             return;
116             }
117             }
118             if (terminating())
119             {
120             $spareSemaphore->up();
121             $workerSemaphore->up();
122             return;
123             }
124             my $self = $class->SUPER();
125             $self->tid = undef;
126             do
127             {
128             lock($self);
129             my $thr = threads->create(\&run, $self) or $self->throw($!);
130             cond_wait($self);
131             unless (defined($self->tid))
132             {
133             App::cdnget::Exception->throw($thr->join());
134             }
135             $thr->detach();
136             };
137             return $self;
138             }
139              
140             sub DESTROY
141             {
142             my $self = shift;
143             $self->SUPER::DESTROY;
144             }
145              
146             sub throw
147             {
148             my $self = shift;
149             my ($msg) = @_;
150             unless (ref($msg))
151             {
152             $msg = "Unknown" unless $msg;
153             $msg = "Worker ".
154             $msg;
155             }
156             App::cdnget::Exception->throw($msg, 1);
157             }
158              
159             sub worker
160             {
161             my $self = shift;
162             my ($req) = @_;
163             my ($in, $out, $err) = $req->GetHandles();
164             my $env = $req->GetEnvironment();
165              
166             my $id = $env->{CDNGET_ID};
167             $self->throw("Invalid ID") unless defined($id) and $id =~ /^\w+$/i;
168              
169             my $origin = $env->{CDNGET_ORIGIN};
170             $self->throw("Invalid origin") unless defined($origin);
171             $origin = URI->new($origin);
172             $self->throw("Invalid origin scheme") unless $origin->scheme =~ /^http|https$/i;
173             $origin->path(substr($origin->path, 0, length($origin->path)-1)) while $origin->path and substr($origin->path, -1) eq "/";
174              
175             my $uri = $env->{CDNGET_URI};
176             $self->throw("Invalid URI") unless defined($uri);
177             $uri = "/$uri" unless $uri and substr($uri, 0, 1) eq "/";
178              
179             my $hook = $env->{CDNGET_HOOK};
180             $hook = "" unless defined($hook);
181              
182             my $url = $origin->scheme."://".$origin->host_port.$origin->path.$uri;
183             my $digest = Digest::SHA::sha256_hex("$url $hook");
184             my $uid = "$id/$digest";
185             my $path = "$cachePath/$id";
186             mkdir($path);
187             my @dirs = $digest =~ /..../g;
188             my $file = pop @dirs;
189             for (@dirs)
190             {
191             $path .= "/$_";
192             mkdir($path);
193             }
194             $self->throw("Cache directory not exists") unless -d $path;
195             $path .= "/$file";
196              
197             my $fh;
198             my $downloader;
199             do
200             {
201             lock(%App::cdnget::Downloader::uids);
202             $fh = FileHandle->new($path, "<");
203             unless ($fh)
204             {
205             return unless App::cdnget::Downloader->new($uid, $path, $url, $hook);
206             $fh = FileHandle->new($path, "<") or $self->throw($!);
207             }
208             $downloader = $App::cdnget::Downloader::uids{$uid};
209             };
210             $fh->binmode(":bytes") or $self->throw($!);
211              
212             do
213             {
214             local ($/, $\) = ("\r\n")x2;
215             my $line;
216             my $buf;
217             my $empty = 1;
218             while (not $self->terminating)
219             {
220             threads->yield();
221             my $downloaderTerminated = ! $downloader || $downloader->terminated;
222             $line = $fh->getline;
223             unless (defined($line))
224             {
225             $self->throw($!) if $fh->error;
226             return if $downloaderTerminated;
227             my $pos = $fh->tell;
228             usleep(1*1000);
229             $fh->seek($pos, 0) or $self->throw($!);
230             next;
231             }
232             chomp $line;
233             unless ($line =~ /^(Client\-)/i)
234             {
235             if (not $out->print("$line\r\n"))
236             {
237             not $! or $!{EPIPE} or $!{ECONNRESET} or $!{EPROTOTYPE} or $self->throw($!);
238             return;
239             }
240             $empty = 0;
241             }
242             last unless $line;
243             }
244             while (not $self->terminating)
245             {
246             threads->yield();
247             my $downloaderTerminated = ! $downloader || $downloader->terminated;
248             my $len = $fh->read($buf, $App::cdnget::CHUNK_SIZE);
249             $self->throw($!) unless defined($len);
250             if ($len == 0)
251             {
252             return if $downloaderTerminated;
253             my $pos = $fh->tell;
254             usleep(1*1000);
255             $fh->seek($pos, 0) or $self->throw($!);
256             next;
257             }
258             if (not $out->write($buf, $len))
259             {
260             not $! or $!{EPIPE} or $!{ECONNRESET} or $!{EPROTOTYPE} or $self->throw($!);
261             return;
262             }
263             $empty = 0;
264             }
265             if ($empty)
266             {
267             if (not $out->print("Status: 404\r\n"))
268             {
269             not $! or $!{EPIPE} or $!{ECONNRESET} or $!{EPROTOTYPE} or $self->throw($!);
270             return;
271             }
272             }
273             };
274             return;
275             }
276              
277             sub run
278             {
279             my $self = shift;
280             my $tid = threads->tid();
281              
282             $self->tid = $tid;
283             do
284             {
285             lock($self);
286             cond_signal($self);
287             };
288              
289             my $spare = 1;
290             my $accepting = 0;
291             eval
292             {
293             my ($in, $out, $err) = (IO::Handle->new(), IO::Handle->new(), IO::Handle->new());
294             my $env = {};
295             my $req = FCGI::Request($in, $out, $err, $env, $socket, FCGI::FAIL_ACCEPT_ON_INTR) or $self->throw($!);
296              
297             wait_accept:
298             while (not $self->terminating)
299             {
300             $accepterSemaphore->down_timed(1);
301             do
302             {
303             lock($accepterCount);
304             last wait_accept unless $accepterCount >= $spareCount;
305             };
306             }
307             $spareSemaphore->up();
308             $spare = 0;
309              
310             accepter_loop:
311             while (not $self->terminating)
312             {
313             threads->yield();
314             $workerSemaphore->up();
315             $accepting = 1;
316             my $accept;
317             do
318             {
319             lock($accepterCount);
320             $accepterCount++;
321             };
322             eval { $accept = $req->Accept() };
323             do
324             {
325             lock($accepterCount);
326             $accepterCount--;
327             };
328             $accepterSemaphore->up();
329             last unless $accept >= 0;
330             if ($self->terminating)
331             {
332             $req->Finish();
333             last;
334             }
335             $workerSemaphore->down();
336             $accepting = 0;
337             eval
338             {
339             $self->worker($req);
340             };
341             do
342             {
343             local $@;
344             $req->Finish();
345             };
346             if ($@)
347             {
348             die $@;
349             }
350             do
351             {
352             lock($accepterCount);
353             last accepter_loop if $accepterCount >= $spareCount;
354             };
355             }
356             };
357             do
358             {
359             local $@;
360             $workerSemaphore->up() unless $accepting;
361             $spareSemaphore->up() if $spare;
362             usleep(10*1000); #cond_wait bug
363             lock($self);
364             $self->tid = undef;
365             };
366             if ($@)
367             {
368             warn $@;
369             }
370             return;
371             }
372              
373              
374             1;
375             __END__
376             =head1 REPOSITORY
377              
378             B<GitHub> L<https://github.com/orkunkaraduman/p5-cdnget>
379              
380             B<CPAN> L<https://metacpan.org/release/App-cdnget>
381              
382             =head1 AUTHOR
383              
384             Orkun Karaduman <orkunkaraduman@gmail.com>
385              
386             =head1 COPYRIGHT AND LICENSE
387              
388             Copyright (C) 2017 Orkun Karaduman <orkunkaraduman@gmail.com>
389              
390             This program is free software: you can redistribute it and/or modify
391             it under the terms of the GNU General Public License as published by
392             the Free Software Foundation, either version 3 of the License, or
393             (at your option) any later version.
394              
395             This program is distributed in the hope that it will be useful,
396             but WITHOUT ANY WARRANTY; without even the implied warranty of
397             MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
398             GNU General Public License for more details.
399              
400             You should have received a copy of the GNU General Public License
401             along with this program. If not, see <http://www.gnu.org/licenses/>.
402              
403             =cut