File Coverage

lib/Mojo/IOLoop/ReadWriteProcess/Queue.pm
Criterion Covered Total %
statement 46 46 100.0
branch 10 12 83.3
condition 4 6 66.6
subroutine 12 12 100.0
pod 3 3 100.0
total 75 79 94.9


line stmt bran cond sub pod time code
1             package Mojo::IOLoop::ReadWriteProcess::Queue;
2 38     38   254 use Mojo::Base -base;
  38         68  
  38         276  
3 38     38   5937 use Mojo::IOLoop::ReadWriteProcess::Pool;
  38         76  
  38         1573  
4 38     38   319 use Mojo::IOLoop::ReadWriteProcess;
  38         73  
  38         386  
5 38     38   18589 use Mojo::IOLoop::ReadWriteProcess::Session;
  38         82  
  38         2001  
6              
7 38     38   285 use constant DEBUG => $ENV{MOJO_PROCESS_DEBUG};
  38         73  
  38         34230  
8              
9             has queue => sub { Mojo::IOLoop::ReadWriteProcess::Pool->new() };
10             has pool => sub { Mojo::IOLoop::ReadWriteProcess::Pool->new() };
11             has done => sub { Mojo::IOLoop::ReadWriteProcess::Pool->new() };
12             has session => sub { Mojo::IOLoop::ReadWriteProcess::Session->singleton };
13              
14             sub _dequeue {
15 40     40   111 my ($self, $process) = @_;
16              
17 40     75   104 $self->pool($self->pool->grep(sub { $process ne $_ }));
  75         1147  
18 40 100 66     1361 shift @{$self->queue}
  32         222  
19             if ($self->queue->first && $self->pool->add($self->queue->first));
20             }
21              
22 311 100   311 1 776 sub exhausted { $_[0]->pool->size == 0 && shift->queue->size == 0 }
23              
24             sub consume {
25 4     4 1 15 my $self = shift;
26 4         15 $self->session->enable;
27 4         18 $self->done->maximum_processes(
28             $self->queue->maximum_processes + $self->pool->maximum_processes);
29 4         14 until ($self->exhausted) {
30 307         30475 sleep .5;
31             $self->session->_protect(
32             sub {
33             $self->pool->each(
34             sub {
35 384         5899 my $p = shift;
36 384 50       742 return unless $p;
37 384 100       1184 return if exists $p->{started};
38 40         230 $p->{started}++;
39 40         1171 $p->once(stop => sub { $self->done->add($p); $self->_dequeue($p) });
  40         4680  
  40         334  
40 40         1863 $p->start;
41 307     307   757 });
42 307         2210 });
43             }
44             }
45              
46             sub add {
47 40     40 1 887 my $self = shift;
48 40   66     68 $self->pool->add(@_) // $self->queue->add(@_);
49             }
50              
51             sub AUTOLOAD {
52 4     4   71 our $AUTOLOAD;
53 4         9 my $fn = $AUTOLOAD;
54 4         43 $fn =~ s/.*:://;
55 4 100       22 return if $fn eq "DESTROY";
56 3         7 my $self = shift;
57             return (
58 3         14 eval { $self->pool->Mojo::IOLoop::ReadWriteProcess::Pool::_cmd(@_, $fn) },
59             (grep(/once|on|emit/, $fn))
60 3 50       17 ? eval { $self->queue->Mojo::IOLoop::ReadWriteProcess::Pool::_cmd(@_, $fn) }
  3         14  
61             : ());
62             }
63              
64             1;
65              
66             =encoding utf-8
67              
68             =head1 NAME
69              
70             Mojo::IOLoop::ReadWriteProcess::Queue - Queue for Mojo::IOLoop::ReadWriteProcess objects.
71              
72             =head1 SYNOPSIS
73              
74             use Mojo::IOLoop::ReadWriteProcess qw(queue process);
75             my $n_proc = 20;
76             my $fired;
77              
78             my $q = queue;
79              
80             $q->pool->maximum_processes(2); # Max 2 processes in parallel
81             $q->queue->maximum_processes(10); # Max queue is 10
82              
83             $q->add( process sub { return 42 } ) for 1..7;
84              
85             # Subscribe to all "stop" events in the pool
86             $q->once(stop => sub { $fired++; });
87              
88             # Consume the queue
89             $q->consume();
90              
91             my $all = $q->done; # All processes, Mojo::Collection of Mojo::IOLoop::ReadWriteProcess
92              
93             # Set your own running pool
94             $q->pool(parallel sub { return 42 } => 5);
95              
96             # Set your own queue
97             $q->queue(parallel sub { return 42 } => 20);
98              
99             $q->consume();
100              
101             =head1 METHODS
102              
103             L inherits all methods from L and implements
104             the following new ones.
105             Note: It proxies all the other methods of L for the whole process group.
106              
107             =head2 add
108              
109             use Mojo::IOLoop::ReadWriteProcess qw(queue process);
110             my $q = queue();
111             $q->add(sub { print "Hello 2! " });
112             $q->add(process sub { print "Hello 2! " });
113              
114             Add the process to the queue.
115              
116             =head2 consume
117              
118             use Mojo::IOLoop::ReadWriteProcess qw(queue);
119             my $q = queue();
120             $q->add(sub { print "Hello 2! " });
121             $q->add(process sub { print "Hello 2! " });
122             $q->consume; # executes and exhaust the processes
123              
124             Starts the processes and empties the queue.
125             Note: maximum_processes can be set both to the pool (number of process to be run in parallel),
126             and for the queue (that gets exhausted during the C phase).
127              
128             $q->pool->maximum_processes(2); # Max 2 processes in parallel
129             $q->queue->maximum_processes(10); # Max queue is 10
130              
131             =head2 exhausted
132              
133             use Mojo::IOLoop::ReadWriteProcess qw(queue);
134             my $q = queue();
135             $q->add(sub { print "Hello 2! " });
136             $q->add(process sub { print "Hello 2! " });
137             $q->consume; # executes and exhaust the processes
138             $q->exhausted; # 1
139              
140             Returns 1 if the queue is exhausted.
141              
142             =head1 ENVIRONMENT
143              
144             You can set the MOJO_PROCESS_MAXIMUM_PROCESSES environment variable to specify the
145             the maximum number of processes allowed in the pool and the queue, that are
146             L instances.
147              
148             MOJO_PROCESS_MAXIMUM_PROCESSES=10000
149              
150             =head1 LICENSE
151              
152             Copyright (C) Ettore Di Giacinto.
153              
154             This library is free software; you can redistribute it and/or modify
155             it under the same terms as Perl itself.
156              
157             =head1 AUTHOR
158              
159             Ettore Di Giacinto Eedigiacinto@suse.comE
160              
161             =cut