File Coverage

blib/lib/Future/Mutex.pm
Criterion Covered Total %
statement 34 34 100.0
branch 7 8 87.5
condition 2 2 100.0
subroutine 8 8 100.0
pod 3 3 100.0
total 54 55 98.1


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2016-2020 -- leonerd@leonerd.org.uk
5              
6             package Future::Mutex;
7              
8 1     1   417 use v5.10;
  1         4  
9 1     1   5 use strict;
  1         2  
  1         30  
10 1     1   6 use warnings;
  1         2  
  1         50  
11              
12             our $VERSION = '0.50';
13              
14 1     1   6 use Future;
  1         8  
  1         339  
15              
16             =head1 NAME
17              
18             C - mutual exclusion lock around code that returns Ls
19              
20             =head1 SYNOPSIS
21              
22             use Future::Mutex;
23              
24             my $mutex = Future::Mutex->new;
25              
26             sub do_atomically
27             {
28             return $mutex->enter( sub {
29             ...
30             return $f;
31             });
32             }
33              
34             =head1 DESCRIPTION
35              
36             Most L-using code expects to run with some level of concurrency, using
37             future instances to represent still-pending operations that will complete at
38             some later time. There are occasions however, when this concurrency needs to
39             be restricted - some operations that, once started, must not be interrupted
40             until they are complete. Subsequent requests to perform the same operation
41             while one is still outstanding must therefore be queued to wait until the
42             first is finished. These situations call for a mutual-exclusion lock, or
43             "mutex".
44              
45             A C instance provides one basic operation, which will execute a
46             given block of code which returns a future, and itself returns a future to
47             represent that. The mutex can be in one of two states; either unlocked or
48             locked. While it is unlocked, requests to execute code are handled
49             immediately. Once a block of code is invoked, the mutex is now considered to
50             be locked, causing any subsequent requests to invoke code to be queued behind
51             the first one, until it completes. Once the initial code indicates completion
52             (by its returned future providing a result or failing), the next queued code
53             is invoked.
54              
55             An instance may also be a counting mutex if initialised with a count greater
56             than one. In this case, it can keep multiple blocks outstanding up to that
57             limit, with subsequent requests queued as before. This allows it to act as a
58             concurrency-bounding limit around some operation that can run concurrently,
59             but an application wishes to apply overall limits to stop it growing too much,
60             such as communications with external services or executing other programs.
61              
62             =cut
63              
64             =head1 CONSTRUCTOR
65              
66             =cut
67              
68             =head2 new
69              
70             $mutex = Future::Mutex->new( count => $n )
71              
72             Returns a new C instance. It is initially unlocked.
73              
74             Takes the following named arguments:
75              
76             =over 8
77              
78             =item count => INT
79              
80             Optional number to limit outstanding concurrency. Will default to 1 if not
81             supplied.
82              
83             =back
84              
85             =cut
86              
87             sub new
88             {
89 10     10 1 2439 my $class = shift;
90 10         22 my %params = @_;
91              
92             return bless {
93 10   100     76 avail => $params{count} // 1,
94             waitf => undef,
95             queue => [],
96             }, $class;
97             }
98              
99             =head1 METHODS
100              
101             =cut
102              
103             =head2 enter
104              
105             $f = $mutex->enter( \&code )
106              
107             Returns a new C that represents the eventual result of calling the
108             code. If the mutex is currently unlocked, the code will be invoked
109             immediately. If it is currently locked, the code will be queued waiting for
110             the next time it becomes unlocked.
111              
112             The code is invoked with no arguments, and is expected to return a C.
113             The eventual result of that future determines the result of the future that
114             C returned.
115              
116             =cut
117              
118             sub enter
119             {
120 17     17 1 111 my $self = shift;
121 17         28 my ( $code ) = @_;
122              
123 17         25 my $down_f;
124 17 100       34 if( $self->{avail} ) {
125 11         18 $self->{avail}--;
126 11         40 $down_f = Future->done;
127             }
128             else {
129 6 50       15 die "ARGH Need to clone an existing future\n" unless defined $self->{waitf};
130 6         7 push @{ $self->{queue} }, $down_f = $self->{waitf}->new;
  6         15  
131             }
132              
133             my $up = sub {
134 17 100   17   22 if( my $next_f = shift @{ $self->{queue} } ) {
  17         47  
135 6         17 $next_f->done;
136             }
137             else {
138 11         14 $self->{avail}++;
139 11         38 undef $self->{waitf};
140             }
141 17         63 };
142              
143 17         74 my $retf = $down_f->then( $code )->on_ready( $up );
144 17 100       39 $self->{waitf} or $self->{waitf} = $retf;
145 17         136 return $retf;
146             }
147              
148             =head2 available
149              
150             $avail = $mutex->available
151              
152             Returns true if the mutex is currently unlocked, or false if it is locked.
153              
154             =cut
155              
156             sub available
157             {
158 12     12 1 1002 my $self = shift;
159 12         56 return $self->{avail};
160             }
161              
162             =head1 AUTHOR
163              
164             Paul Evans
165              
166             =cut
167              
168             0x55AA;