File Coverage

lib/Mojo/IOLoop/ReadWriteProcess/Pool.pm
Criterion Covered Total %
statement 36 36 100.0
branch 10 10 100.0
condition 1 2 50.0
subroutine 11 11 100.0
pod 5 5 100.0
total 63 64 98.4


line stmt bran cond sub pod time code
1             package Mojo::IOLoop::ReadWriteProcess::Pool;
2 38     38   319 use Mojo::Base 'Mojo::Collection';
  38         78  
  38         241  
3 38   50 38   6566 use constant MAXIMUM_PROCESSES => $ENV{MOJO_PROCESS_MAXIMUM_PROCESSES} // 100;
  38         218  
  38         3024  
4 38     38   286 use Scalar::Util qw(blessed);
  38         110  
  38         22891  
5              
6             my %max_proc;
7              
8             sub new {
9 57     57 1 416 my $s = shift->SUPER::new(@_);
10 57         1198 $max_proc{$s} = MAXIMUM_PROCESSES;
11 57         429 $s;
12             }
13              
14 5     5 1 22 sub get { @{$_[0]}[$_[1]] }
  5         59  
15 1     1 1 10 sub remove { delete @{$_[0]}[$_[1]] }
  1         7  
16              
17             sub add {
18 188 100   188 1 3512 return undef unless $_[0]->size < $max_proc{$_[0]};
19 154         1338 my $self = shift;
20 154 100       223 push @{$self},
  154         839  
21             blessed $_[0] ? $_[0] : Mojo::IOLoop::ReadWriteProcess->new(@_);
22 154         881 $self->last;
23             }
24              
25             sub maximum_processes {
26 29 100   29 1 3020 $max_proc{$_[0]} = pop() if $_[1];
27 29         191 $max_proc{$_[0]};
28             }
29              
30             sub _cmd {
31 20     20   77 my $c = shift;
32 20         59 my $f = pop;
33 20         66 my @args = @_;
34 20         33 my @r;
35 20     81   202 $c->each(sub { push(@r, +shift()->$f(@args)) });
  81         1602  
36 20 100       916 wantarray ? @r : $c;
37             }
38              
39             sub AUTOLOAD {
40 71     71   49778 our $AUTOLOAD;
41 71         156 my $fn = $AUTOLOAD;
42 71         638 $fn =~ s/.*:://;
43 71 100       711 return if $fn eq "DESTROY";
44 14         29 return eval { shift->_cmd(@_, $fn) };
  14         53  
45             }
46              
47             1;
48              
49             =encoding utf-8
50              
51             =head1 NAME
52              
53             Mojo::IOLoop::ReadWriteProcess::Pool - Pool of Mojo::IOLoop::ReadWriteProcess objects.
54              
55             =head1 SYNOPSIS
56              
57             use Mojo::IOLoop::ReadWriteProcess qw(parallel);
58             my $n_proc = 20;
59             my $fired;
60              
61             my $p = parallel sub { print "Hello world\n"; } => $n_proc;
62              
63             # Subscribe to all "stop" events in the pool
64             $p->once(stop => sub { $fired++; });
65              
66             # Start all processes belonging to the pool
67             $p->start();
68              
69             # Receive the process output
70             $p->each(sub { my $p = shift; $p->getline(); });
71             $p->wait_stop;
72              
73             # Get the last one! (it's a Mojo::Collection!)
74             $p->last()->stop();
75              
76             =head1 METHODS
77              
78             L inherits all methods from L and implements
79             the following new ones.
80             Note: It proxies all the other methods of L for the whole process group.
81              
82             =head2 get
83              
84             use Mojo::IOLoop::ReadWriteProcess qw(parallel);
85             my $pool = parallel(sub { print "Hello" } => 5);
86             $pool->get(4);
87              
88             Get the element specified in the pool (starting from 0).
89              
90             =head2 add
91              
92             use Mojo::IOLoop::ReadWriteProcess qw(pool);
93             my $pool = pool(maximum_processes => 2);
94             $pool->add(sub { print "Hello 2! " });
95              
96             Add the element specified in the pool.
97              
98             =head2 remove
99              
100             use Mojo::IOLoop::ReadWriteProcess qw(parallel);
101             my $pool = parallel(sub { print "Hello" } => 5);
102             $pool->remove(4);
103              
104             Remove the element specified in the pool.
105              
106             =head2 maximum_processes
107              
108             use Mojo::IOLoop::ReadWriteProcess qw(parallel);
109             my $pool = parallel(sub { print "Hello" } => 5);
110             $pool->maximum_processes(30);
111             $pool->add(...);
112              
113             Prevent from adding processes to the pool. If we reach C number
114             of processes, C will refuse to add more to the pool.
115              
116             =head1 ENVIRONMENT
117              
118             You can set the MOJO_PROCESS_MAXIMUM_PROCESSES environment variable to specify the
119             the maximum number of processes allowed in L instances.
120              
121             MOJO_PROCESS_MAXIMUM_PROCESSES=10000
122              
123             =head1 LICENSE
124              
125             Copyright (C) Ettore Di Giacinto.
126              
127             This library is free software; you can redistribute it and/or modify
128             it under the same terms as Perl itself.
129              
130             =head1 AUTHOR
131              
132             Ettore Di Giacinto Eedigiacinto@suse.comE
133              
134             =cut