File Coverage

blib/lib/Qless/Queue.pm
Criterion Covered Total %
statement 18 81 22.2
branch 0 14 0.0
condition 0 33 0.0
subroutine 6 21 28.5
pod 10 14 71.4
total 34 163 20.8


line stmt bran cond sub pod time code
1             package Qless::Queue;
2             =head1 NAME
3              
4             Qless:Queue
5              
6             =cut
7              
8 1     1   4 use strict; use warnings;
  1     1   1  
  1         23  
  1         5  
  1         1  
  1         30  
9 1     1   4 use JSON::XS qw(decode_json encode_json);
  1         2  
  1         42  
10 1     1   524 use Qless::Jobs;
  1         2  
  1         22  
11 1     1   439 use Qless::Job;
  1         4  
  1         47  
12 1     1   8 use Time::HiRes qw();
  1         3  
  1         930  
13              
14             =head1 METHODS
15              
16             =head2 C
17             =cut
18             sub new {
19 0     0 1   my $class = shift;
20 0           my ($name, $client, $worker_name) = @_;
21              
22 0 0         $class = ref $class if ref $class;
23 0           my $self = bless {}, $class;
24              
25 0           $self->{'name'} = $name;
26 0           $self->{'client'} = $client;
27 0           $self->{'worker_name'} = $worker_name;
28              
29 0           $self;
30             }
31              
32             sub generate_jid {
33 0     0 0   my ($self, $data) = @_;
34 0           return $self->{'worker_name'}.'-'.CORE::time.'-'.sprintf('%06d', int(rand(999999)));
35             }
36              
37 0     0 0   sub client { $_[0]->{'client'} }
38 0     0 0   sub name { $_[0]->{'name'} }
39 0     0 0   sub worker_name { $_[0]->{'worker_name'} }
40              
41              
42             =head2 C
43              
44             =cut
45              
46             sub jobs {
47 0     0 1   my ($self) = @_;
48 0           Qless::Jobs->new($self->{'name'}, $self->{'client'});
49             }
50              
51             =head2 C
52              
53             =cut
54              
55             sub counts {
56 0     0 1   my ($self) = @_;
57 0           return decode_json($self->{'client'}->_queues([], Time::HiRes::time, $self->{'name'}));
58             }
59              
60             =head2 C
61             =cut
62             sub heartbeat {
63 0     0 1   my ($self, $new_value) = @_;
64              
65 0           my $config = $self->{'client'}->config;
66              
67 0 0         if (defined $new_value) {
68 0           $config->set($self->{'name'}.'-heartbeat', $new_value);
69 0           return;
70             }
71              
72 0   0       return $config->get($self->{'name'}.'-heartbeat') || $config->get('heartbeat') || 60;
73             }
74              
75             =head2 C
76             =cut
77             sub put {
78 0     0 1   my ($self, $klass, $data, %args ) = @_;
79              
80 0   0       return $self->{'client'}->_put([$self->{'name'}],
      0        
      0        
      0        
      0        
      0        
81             $args{'jid'} || $self->generate_jid($data),
82             $klass,
83             encode_json($data),
84             Time::HiRes::time,
85             $args{'delay'} || 0,
86             'priority', $args{'priority'} || 0,
87             'tags', encode_json($args{'tags'} || []),
88             'retries', $args{'retries'} || 5,
89             'depends', encode_json($args{'depends'} || []),
90             );
91             }
92              
93             =head2 C
94             =cut
95             sub recur {
96 0     0 1   my ($self, $klass, $data, @args) = @_;
97              
98 0           my $interval;
99             my %args;
100              
101 0 0         if (scalar(@args)%2) {
102 0           $interval = shift @args;
103 0           %args = @args;
104             }
105             else {
106 0           %args = @args;
107 0           $interval = $args{'interval'};
108             }
109              
110 0   0       return $self->{'client'}->_recur([], 'on', $self->{'name'},
      0        
      0        
      0        
      0        
111             $args{'jid'} || $self->generate_jid($data),
112             $klass,
113             encode_json($data),
114             Time::HiRes::time,
115             'interval', $interval, $args{'offset'} || 0,
116             'priority', $args{'priority'} || 0,
117             'tags', encode_json($args{'tags'} || []),
118             'retries', $args{'retries'} || 5,
119             );
120              
121             }
122              
123             =head2 C
124             =cut
125             sub pop {
126 0     0 1   my ($self, $count) = @_;
127 0   0       my $jobs = [ map { Qless::Job->new($self->{'client'}, decode_json($_)) }
  0            
128 0           @{ $self->{'client'}->_pop([$self->{'name'}], $self->{'worker_name'}, $count||1, Time::HiRes::time) } ];
129 0 0         if (!defined $count) {
130 0 0         return scalar @{ $jobs } ? $jobs->[0] : undef;
  0            
131             }
132              
133 0           return @{ $jobs };
  0            
134             }
135              
136             =head2 C
137             =cut
138             sub peek {
139 0     0 1   my ($self, $count) = @_;
140 0   0       my $jobs = [ map { Qless::Job->new($self->{'client'}, decode_json($_)) }
  0            
141 0           @{ $self->{'client'}->_peek([$self->{'name'}], $count||1, Time::HiRes::time) } ];
142 0 0         if (!defined $count) {
143 0 0         return scalar @{ $jobs } ? $jobs->[0] : undef;
  0            
144             }
145              
146 0           return @{ $jobs };
  0            
147             }
148              
149             =head2 C
150              
151             =cut
152              
153             sub stats {
154 0     0 1   my ($self, $date) = @_;
155 0   0       return decode_json($self->{'client'}->_stats([], $self->{'name'}, $date || Time::HiRes::time));
156             }
157              
158             =head2 C
159              
160             =cut
161              
162             sub length {
163 0     0 1   my ($self) = @_;
164              
165 0           my $redis = $self->{'client'}->{'redis'};
166 0           my $sum = 0;
167 0     0     my $sum_cb = sub { $sum += shift };
  0            
168              
169 0           $redis->zcard('ql:q:'.$self->{'name'}.'-locks', $sum_cb);
170 0           $redis->zcard('ql:q:'.$self->{'name'}.'-work', $sum_cb);
171 0           $redis->zcard('ql:q:'.$self->{'name'}.'-scheduled', $sum_cb);
172 0           $redis->wait_all_responses;
173              
174 0           return $sum;
175             }
176              
177             1;