File Coverage

lib/Mojo/IOLoop/ReadWriteProcess/Queue.pm
Criterion Covered Total %
statement 47 47 100.0
branch 10 12 83.3
condition 4 6 66.6
subroutine 12 12 100.0
pod 3 3 100.0
total 76 80 95.0


line stmt bran cond sub pod time code
1             package Mojo::IOLoop::ReadWriteProcess::Queue;
2 38     38   274 use Mojo::Base -base;
  38         76  
  38         424  
3 38     38   6430 use Mojo::IOLoop::ReadWriteProcess::Pool;
  38         93  
  38         1484  
4 38     38   243 use Mojo::IOLoop::ReadWriteProcess;
  38         79  
  38         496  
5 38     38   20038 use Mojo::IOLoop::ReadWriteProcess::Session;
  38         94  
  38         2183  
6              
7 38     38   278 use constant DEBUG => $ENV{MOJO_PROCESS_DEBUG};
  38         71  
  38         35811  
8              
9             has queue => sub { Mojo::IOLoop::ReadWriteProcess::Pool->new() };
10             has pool => sub { Mojo::IOLoop::ReadWriteProcess::Pool->new() };
11             has done => sub { Mojo::IOLoop::ReadWriteProcess::Pool->new() };
12             has session => sub { Mojo::IOLoop::ReadWriteProcess::Session->singleton };
13              
14             sub _dequeue {
15 40     40   94 my ($self, $process) = @_;
16              
17 40     75   144 $self->pool($self->pool->grep(sub { $process ne $_ }));
  75         1257  
18 40 100 66     1081 shift @{$self->queue}
  32         247  
19             if ($self->queue->first && $self->pool->add($self->queue->first));
20             }
21              
22 436 100   436 1 1164 sub exhausted { $_[0]->pool->size == 0 && shift->queue->size == 0 }
23              
24             sub consume {
25 4     4 1 11 my $self = shift;
26 4         31 $self->session->enable;
27 4         28 $self->done->maximum_processes(
28             $self->queue->maximum_processes + $self->pool->maximum_processes);
29 4         15 until ($self->exhausted) {
30 432         37926 sleep .5;
31 432         2494 $self->session->consume_collected_info;
32             $self->session->_protect(
33             sub {
34             $self->pool->each(
35             sub {
36 544         7883 my $p = shift;
37 544 50       994 return unless $p;
38 544 100       1409 return if exists $p->{started};
39 40         366 $p->{started}++;
40 40         1179 $p->once(stop => sub { $self->done->add($p); $self->_dequeue($p) });
  40         4337  
  40         306  
41 40         2153 $p->start;
42 432     432   1230 });
43 432         2528 });
44             }
45             }
46              
47             sub add {
48 40     40 1 808 my $self = shift;
49 40   66     77 $self->pool->add(@_) // $self->queue->add(@_);
50             }
51              
52             sub AUTOLOAD {
53 4     4   50 our $AUTOLOAD;
54 4         11 my $fn = $AUTOLOAD;
55 4         67 $fn =~ s/.*:://;
56 4 100       24 return if $fn eq "DESTROY";
57 3         7 my $self = shift;
58             return (
59 3         11 eval { $self->pool->Mojo::IOLoop::ReadWriteProcess::Pool::_cmd(@_, $fn) },
60             (grep(/once|on|emit/, $fn))
61 3 50       6 ? eval { $self->queue->Mojo::IOLoop::ReadWriteProcess::Pool::_cmd(@_, $fn) }
  3         26  
62             : ());
63             }
64              
65             1;
66              
67             =encoding utf-8
68              
69             =head1 NAME
70              
71             Mojo::IOLoop::ReadWriteProcess::Queue - Queue for Mojo::IOLoop::ReadWriteProcess objects.
72              
73             =head1 SYNOPSIS
74              
75             use Mojo::IOLoop::ReadWriteProcess qw(queue process);
76             my $n_proc = 20;
77             my $fired;
78              
79             my $q = queue;
80              
81             $q->pool->maximum_processes(2); # Max 2 processes in parallel
82             $q->queue->maximum_processes(10); # Max queue is 10
83              
84             $q->add( process sub { return 42 } ) for 1..7;
85              
86             # Subscribe to all "stop" events in the pool
87             $q->once(stop => sub { $fired++; });
88              
89             # Consume the queue
90             $q->consume();
91              
92             my $all = $q->done; # All processes, Mojo::Collection of Mojo::IOLoop::ReadWriteProcess
93              
94             # Set your own running pool
95             $q->pool(parallel sub { return 42 } => 5);
96              
97             # Set your own queue
98             $q->queue(parallel sub { return 42 } => 20);
99              
100             $q->consume();
101              
102             =head1 METHODS
103              
104             L inherits all methods from L and implements
105             the following new ones.
106             Note: It proxies all the other methods of L for the whole process group.
107              
108             =head2 add
109              
110             use Mojo::IOLoop::ReadWriteProcess qw(queue process);
111             my $q = queue();
112             $q->add(sub { print "Hello 2! " });
113             $q->add(process sub { print "Hello 2! " });
114              
115             Add the process to the queue.
116              
117             =head2 consume
118              
119             use Mojo::IOLoop::ReadWriteProcess qw(queue);
120             my $q = queue();
121             $q->add(sub { print "Hello 2! " });
122             $q->add(process sub { print "Hello 2! " });
123             $q->consume; # executes and exhaust the processes
124              
125             Starts the processes and empties the queue.
126             Note: maximum_processes can be set both to the pool (number of process to be run in parallel),
127             and for the queue (that gets exhausted during the C phase).
128              
129             $q->pool->maximum_processes(2); # Max 2 processes in parallel
130             $q->queue->maximum_processes(10); # Max queue is 10
131              
132             =head2 exhausted
133              
134             use Mojo::IOLoop::ReadWriteProcess qw(queue);
135             my $q = queue();
136             $q->add(sub { print "Hello 2! " });
137             $q->add(process sub { print "Hello 2! " });
138             $q->consume; # executes and exhaust the processes
139             $q->exhausted; # 1
140              
141             Returns 1 if the queue is exhausted.
142              
143             =head1 ENVIRONMENT
144              
145             You can set the MOJO_PROCESS_MAXIMUM_PROCESSES environment variable to specify the
146             the maximum number of processes allowed in the pool and the queue, that are
147             L instances.
148              
149             MOJO_PROCESS_MAXIMUM_PROCESSES=10000
150              
151             =head1 LICENSE
152              
153             Copyright (C) Ettore Di Giacinto.
154              
155             This library is free software; you can redistribute it and/or modify
156             it under the same terms as Perl itself.
157              
158             =head1 AUTHOR
159              
160             Ettore Di Giacinto Eedigiacinto@suse.comE
161              
162             =cut