File Coverage

lib/App/MtAws/QueueEngine.pm
Criterion Covered Total %
statement 59 62 95.1
branch 10 14 71.4
condition n/a
subroutine 10 12 83.3
pod 0 7 0.0
total 79 95 83.1


line stmt bran cond sub pod time code
1             # mt-aws-glacier - Amazon Glacier sync client
2             # Copyright (C) 2012-2014 Victor Efimov
3             # http://mt-aws.com (also http://vs-dev.com) vs@vs-dev.com
4             # License: GPLv3
5             #
6             # This file is part of "mt-aws-glacier"
7             #
8             # mt-aws-glacier is free software: you can redistribute it and/or modify
9             # it under the terms of the GNU General Public License as published by
10             # the Free Software Foundation, either version 3 of the License, or
11             # (at your option) any later version.
12             #
13             # mt-aws-glacier is distributed in the hope that it will be useful,
14             # but WITHOUT ANY WARRANTY; without even the implied warranty of
15             # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16             # GNU General Public License for more details.
17             #
18             # You should have received a copy of the GNU General Public License
19             # along with this program. If not, see <http://www.gnu.org/licenses/>.
20              
21             package App::MtAws::QueueEngine;
22              
23             our $VERSION = '1.114_2';
24              
25 24     24   107512 use strict;
  24         32  
  24         693  
26 24     24   88 use warnings;
  24         38  
  24         561  
27              
28 24     24   83 use Carp;
  24         102  
  24         1112  
29 24     24   7011 use App::MtAws::QueueJobResult;
  24         44  
  24         11735  
30              
31             sub new
32             {
33 270     270 0 3118 my ($class, %args) = @_;
34 270         468 my $self = {};
35 270         424 bless $self, $class;
36 270         570 $self->{task_inc} = 0;
37 270         474 $self->{tasks} = undef;
38 270         473 $self->{freeworkers} = undef;
39 270         463 $self->{workers} = {};
40 270         1238 $self->init(%args);
41 270         1100 return $self;
42             }
43              
44 0     0 0 0 sub init { confess "Unimplemented" }
45 0     0 0 0 sub queue { confess "Unimplemented" }
46              
47             sub add_worker
48             {
49 740     740 0 3019 my ($self, $worker_id) = @_;
50 740         1970 $self->{workers}{$worker_id} = {};
51             }
52              
53             sub unqueue_task
54             {
55 1918     1918 0 29730 my ($self, $worker_id) = @_;
56 1918         3108 my $task_id = delete $self->{workers}{$worker_id}{task};
57 1918 50       4567 my $task = delete $self->{tasks}{$task_id} or confess;
58 1918         1604 push @{ $self->{freeworkers} }, $worker_id;
  1918         2624  
59 1918         3354 return $task;
60             }
61              
62             sub _next_task_id
63             {
64 1918     1918   1798 my ($self) = @_;
65 1918         2045 my $next_id = ++$self->{task_inc};
66 1918 50       3061 $next_id > 0 or confess;
67 1918         2087 $next_id;
68             }
69              
70             sub process
71             {
72 270     270 0 3042 my ($self, $job) = @_;
73 270 50       935 confess "code is not reentrant" if defined $self->{tasks};
74 270         452 $self->{tasks} = {};
75 270         330 @{$self->{freeworkers}} = keys %{$self->{workers}};
  270         769  
  270         922  
76 270         443 while () {
77 4106 100       3179 if (@{ $self->{freeworkers} }) {
  4106         6364  
78 2582         5531 my $res = $job->next;
79 2582 100       5113 if ($res->{code} eq JOB_OK) {
    100          
    50          
80 1918         3337 my $task_id = $self->_next_task_id;
81              
82 1918         1574 my $worker_id = shift @{ $self->{freeworkers} };
  1918         2635  
83 1918         2161 my $task = $res->{task};
84              
85 1918         2334 $task->{_id} = $task_id;
86 1918         4431 $self->queue($worker_id, $task);
87              
88 1918         4705 $self->{tasks}{$task_id} = $task;
89 1918         5067 $self->{workers}{$worker_id}{task} = $task_id;
90              
91             } elsif ($res->{code} eq JOB_WAIT) {
92 394         1047 $self->wait_worker();
93             } elsif ($res->{code} eq JOB_DONE) {
94 270         869 return $job
95             } else {
96 0         0 confess;
97             }
98             } else {
99 1524         2859 $self->wait_worker();
100             }
101             }
102             }
103              
104             sub get_busy_workers_ids
105             {
106 1918     1918 0 5223 my ($self) = @_;
107 1918         1571 grep { $self->{workers}{$_}{task} } keys %{ $self->{workers}};
  5687         11300  
  1918         3625  
108             }
109              
110             1;