line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
# $Id: Mutex.pm,v 1.13 2010/03/27 19:56:34 dk Exp $ |
2
|
|
|
|
|
|
|
package IO::Lambda::Mutex; |
3
|
1
|
|
|
1
|
|
681
|
use vars qw($DEBUG @ISA); |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
92
|
|
4
|
|
|
|
|
|
|
$DEBUG = $IO::Lambda::DEBUG{mutex} || 0; |
5
|
|
|
|
|
|
|
@ISA = qw(Exporter); |
6
|
|
|
|
|
|
|
@EXPORT_OK = qw(mutex); |
7
|
|
|
|
|
|
|
%EXPORT_TAGS = ( all => \@EXPORT_OK); |
8
|
|
|
|
|
|
|
|
9
|
1
|
|
|
1
|
|
4
|
use strict; |
|
1
|
|
|
|
|
1
|
|
|
1
|
|
|
|
|
21
|
|
10
|
1
|
|
|
1
|
|
3
|
use warnings; |
|
1
|
|
|
|
|
1
|
|
|
1
|
|
|
|
|
22
|
|
11
|
1
|
|
|
1
|
|
4
|
use IO::Lambda qw(:all); |
|
1
|
|
|
|
|
16
|
|
|
1
|
|
|
|
|
1707
|
|
12
|
|
|
|
|
|
|
|
13
|
|
|
|
|
|
|
sub new |
14
|
|
|
|
|
|
|
{ |
15
|
1
|
|
|
1
|
1
|
215
|
return bless { |
16
|
|
|
|
|
|
|
taken => 0, |
17
|
|
|
|
|
|
|
queue => [], |
18
|
|
|
|
|
|
|
}, shift; |
19
|
|
|
|
|
|
|
} |
20
|
|
|
|
|
|
|
|
21
|
6
|
|
|
6
|
1
|
1223
|
sub is_taken { $_[0]-> {taken} } |
22
|
7
|
|
|
7
|
1
|
37
|
sub is_free { not $_[0]-> {taken} } |
23
|
|
|
|
|
|
|
|
24
|
|
|
|
|
|
|
# non-blocking take |
25
|
|
|
|
|
|
|
sub take |
26
|
|
|
|
|
|
|
{ |
27
|
8
|
|
|
8
|
1
|
12
|
my $self = shift; |
28
|
8
|
50
|
33
|
|
|
29
|
warn "$self is taken\n" if $DEBUG and not $self->{taken}; |
29
|
8
|
50
|
|
|
|
24
|
return $self-> {taken} ? 0 : ($self-> {taken} = 1); |
30
|
|
|
|
|
|
|
} |
31
|
|
|
|
|
|
|
|
32
|
|
|
|
|
|
|
# remove the lambda from queue |
33
|
|
|
|
|
|
|
sub remove |
34
|
|
|
|
|
|
|
{ |
35
|
4
|
|
|
4
|
1
|
8
|
my ( $self, $lambda) = @_; |
36
|
4
|
|
|
|
|
4
|
my $found; |
37
|
4
|
|
|
|
|
6
|
my $q = $self-> {queue}; |
38
|
4
|
|
|
|
|
12
|
for ( my $i = 0; $i < @$q; $i ++) { |
39
|
3
|
50
|
|
|
|
7
|
next if $q->[$i] != $lambda; |
40
|
3
|
|
|
|
|
4
|
$found = $i; |
41
|
3
|
|
|
|
|
5
|
last; |
42
|
|
|
|
|
|
|
} |
43
|
4
|
100
|
|
|
|
9
|
if ( defined $found) { |
44
|
3
|
|
|
|
|
6
|
splice( @$q, $found, 1); |
45
|
3
|
|
|
|
|
7
|
return 1; |
46
|
|
|
|
|
|
|
} else { |
47
|
1
|
50
|
|
|
|
3
|
warn "$self failed to remove $lambda from queue\n" if $DEBUG; |
48
|
1
|
|
|
|
|
2
|
return 0; |
49
|
|
|
|
|
|
|
} |
50
|
|
|
|
|
|
|
} |
51
|
|
|
|
|
|
|
|
52
|
|
|
|
|
|
|
sub waiter |
53
|
|
|
|
|
|
|
{ |
54
|
11
|
|
|
11
|
1
|
16
|
my ( $self, $timeout) = @_; |
55
|
|
|
|
|
|
|
|
56
|
|
|
|
|
|
|
# mutex is free, can take now |
57
|
11
|
100
|
|
|
|
25
|
unless ( $self-> {taken}) { |
58
|
3
|
|
|
|
|
6
|
$self-> take; |
59
|
3
|
|
|
3
|
|
17
|
return lambda { undef }; |
|
3
|
|
|
|
|
39
|
|
60
|
|
|
|
|
|
|
} |
61
|
|
|
|
|
|
|
|
62
|
|
|
|
|
|
|
# mutex is not free, wait for it |
63
|
8
|
|
|
|
|
28
|
my $waiter = IO::Lambda-> new; |
64
|
|
|
|
|
|
|
my $bind = $waiter-> bind( sub { |
65
|
7
|
|
|
7
|
|
10
|
my ($w,$rec) = (shift,shift); |
66
|
|
|
|
|
|
|
# lambda was terminated, relinquish waiting and kill timeout |
67
|
7
|
100
|
|
|
|
14
|
unless ($w->{__already_removed}) { |
68
|
2
|
|
|
|
|
6
|
my $removed = $self->remove($w); |
69
|
2
|
50
|
66
|
|
|
10
|
$self->release if !$removed && 0 == $self->{queue}; |
70
|
|
|
|
|
|
|
} |
71
|
7
|
100
|
|
|
|
18
|
$w-> cancel_event($timeout) if defined $timeout; |
72
|
7
|
|
|
|
|
12
|
return @_; # propagate error |
73
|
8
|
|
|
|
|
53
|
}); |
74
|
8
|
|
|
|
|
10
|
push @{$self-> {queue}}, $waiter; |
|
8
|
|
|
|
|
14
|
|
75
|
|
|
|
|
|
|
|
76
|
|
|
|
|
|
|
$timeout = $waiter-> watch_timer( $timeout, sub { |
77
|
1
|
|
|
1
|
|
4
|
$self-> remove($waiter); |
78
|
1
|
|
|
|
|
5
|
$waiter-> resolve($bind); |
79
|
1
|
|
|
|
|
5
|
return 'timeout'; |
80
|
8
|
100
|
|
|
|
23
|
}) if defined $timeout; |
81
|
|
|
|
|
|
|
|
82
|
8
|
|
|
|
|
17
|
return $waiter; |
83
|
|
|
|
|
|
|
} |
84
|
|
|
|
|
|
|
|
85
|
|
|
|
|
|
|
sub release |
86
|
|
|
|
|
|
|
{ |
87
|
13
|
|
|
13
|
1
|
31
|
my $self = shift; |
88
|
13
|
50
|
|
|
|
32
|
return unless $self-> {taken}; |
89
|
|
|
|
|
|
|
|
90
|
13
|
100
|
|
|
|
12
|
unless (@{$self-> {queue}}) { |
|
13
|
|
|
|
|
39
|
|
91
|
8
|
50
|
|
|
|
19
|
warn "$self is free\n" if $DEBUG; |
92
|
8
|
|
|
|
|
10
|
$self-> {taken} = 0; |
93
|
8
|
|
|
|
|
12
|
return; |
94
|
|
|
|
|
|
|
} |
95
|
|
|
|
|
|
|
|
96
|
5
|
|
|
|
|
6
|
my $lambda = shift @{$self-> {queue}}; |
|
5
|
|
|
|
|
9
|
|
97
|
|
|
|
|
|
|
|
98
|
5
|
50
|
|
|
|
11
|
warn "$self gives ownership to $lambda\n" if $DEBUG; |
99
|
5
|
|
|
|
|
9
|
$lambda-> {__already_removed} = 1; |
100
|
5
|
|
|
|
|
18
|
$lambda-> terminate(undef); |
101
|
|
|
|
|
|
|
} |
102
|
|
|
|
|
|
|
|
103
|
0
|
|
|
0
|
|
0
|
sub DESTROY { $_-> terminate('dead') for @{shift-> {queue}} } |
|
0
|
|
|
|
|
0
|
|
104
|
|
|
|
|
|
|
|
105
|
|
|
|
|
|
|
sub mutex(&) |
106
|
|
|
|
|
|
|
{ |
107
|
0
|
|
|
0
|
1
|
0
|
my ( $self, $timeout) = context; |
108
|
0
|
|
|
|
|
0
|
$self-> waiter($timeout)-> condition(shift, \&mutex, 'mutex') |
109
|
|
|
|
|
|
|
} |
110
|
|
|
|
|
|
|
|
111
|
|
|
|
|
|
|
sub pipeline |
112
|
|
|
|
|
|
|
{ |
113
|
5
|
|
|
5
|
1
|
6
|
my ( $self, $lambda, $timeout) = @_; |
114
|
|
|
|
|
|
|
lambda { |
115
|
5
|
|
|
5
|
|
5
|
my @p = @_; |
116
|
5
|
|
|
|
|
10
|
context $self-> waiter($timeout); |
117
|
|
|
|
|
|
|
tail { |
118
|
5
|
|
|
|
|
21
|
context $lambda, @p; |
119
|
|
|
|
|
|
|
autocatch tail { |
120
|
5
|
|
|
|
|
11
|
$self-> release; |
121
|
5
|
|
|
|
|
11
|
return @_; |
122
|
5
|
|
|
|
|
28
|
}}} |
|
5
|
|
|
|
|
24
|
|
123
|
5
|
|
|
|
|
19
|
} |
124
|
|
|
|
|
|
|
|
125
|
|
|
|
|
|
|
|
126
|
|
|
|
|
|
|
1; |
127
|
|
|
|
|
|
|
|
128
|
|
|
|
|
|
|
=pod |
129
|
|
|
|
|
|
|
|
130
|
|
|
|
|
|
|
=head1 NAME |
131
|
|
|
|
|
|
|
|
132
|
|
|
|
|
|
|
IO::Lambda::Mutex - wait for a shared resource |
133
|
|
|
|
|
|
|
|
134
|
|
|
|
|
|
|
=head1 DESCRIPTION |
135
|
|
|
|
|
|
|
|
136
|
|
|
|
|
|
|
Objects of class C are mutexes, that as normal mutexes, |
137
|
|
|
|
|
|
|
can be taken and released. The mutexes allow lambdas to wait for their |
138
|
|
|
|
|
|
|
availability with method C, that creates and returns a new lambda, |
139
|
|
|
|
|
|
|
that in turn will finish as soon as the caller can acquire the mutex. |
140
|
|
|
|
|
|
|
|
141
|
|
|
|
|
|
|
=head1 SYNOPSIS |
142
|
|
|
|
|
|
|
|
143
|
|
|
|
|
|
|
use IO::Lambda qw(:lambda); |
144
|
|
|
|
|
|
|
use IO::Lambda::Mutex qw(mutex); |
145
|
|
|
|
|
|
|
|
146
|
|
|
|
|
|
|
my $mutex = IO::Lambda::Mutex-> new; |
147
|
|
|
|
|
|
|
|
148
|
|
|
|
|
|
|
# wait for mutex that shall be available immediately |
149
|
|
|
|
|
|
|
my $waiter = $mutex-> waiter; |
150
|
|
|
|
|
|
|
my $error = $waiter-> wait; |
151
|
|
|
|
|
|
|
die "error:$error" if $error; |
152
|
|
|
|
|
|
|
|
153
|
|
|
|
|
|
|
# create and start a lambda that sleeps 2 seconds and then releases the mutex |
154
|
|
|
|
|
|
|
my $sleeper = lambda { |
155
|
|
|
|
|
|
|
context 2; |
156
|
|
|
|
|
|
|
timeout { $mutex-> release } |
157
|
|
|
|
|
|
|
}; |
158
|
|
|
|
|
|
|
$sleeper-> start; |
159
|
|
|
|
|
|
|
|
160
|
|
|
|
|
|
|
# Create a new lambda that shall only wait for 0.5 seconds. |
161
|
|
|
|
|
|
|
# It will surely fail, since $sleeper is well, still sleeping |
162
|
|
|
|
|
|
|
lambda { |
163
|
|
|
|
|
|
|
context $mutex-> waiter(0.5); |
164
|
|
|
|
|
|
|
tail { |
165
|
|
|
|
|
|
|
my $error = shift; |
166
|
|
|
|
|
|
|
print $error ? "error:$error\n" : "ok\n"; |
167
|
|
|
|
|
|
|
# $error is expected to be 'timeout' |
168
|
|
|
|
|
|
|
} |
169
|
|
|
|
|
|
|
}-> wait; |
170
|
|
|
|
|
|
|
|
171
|
|
|
|
|
|
|
# Again, wait for the same mutex but using different syntax. |
172
|
|
|
|
|
|
|
# This time should be ok - $sleeper will sleep for 1.5 seconds and |
173
|
|
|
|
|
|
|
# then the mutex will be available. |
174
|
|
|
|
|
|
|
lambda { |
175
|
|
|
|
|
|
|
context $mutex, 3; |
176
|
|
|
|
|
|
|
mutex { |
177
|
|
|
|
|
|
|
my $error = shift; |
178
|
|
|
|
|
|
|
print $error ? "error:$error\n" : "ok\n"; |
179
|
|
|
|
|
|
|
# expected to be 'ok' |
180
|
|
|
|
|
|
|
} |
181
|
|
|
|
|
|
|
}->wait; |
182
|
|
|
|
|
|
|
|
183
|
|
|
|
|
|
|
# pipeline - manage a queue of lambdas, stuff new ones to it, guarantees |
184
|
|
|
|
|
|
|
# sequential execution: |
185
|
|
|
|
|
|
|
lambda { |
186
|
|
|
|
|
|
|
context |
187
|
|
|
|
|
|
|
$mutex-> pipeline( lambda { print 1 } ), |
188
|
|
|
|
|
|
|
$mutex-> pipeline( lambda { print 2 } ), |
189
|
|
|
|
|
|
|
$mutex-> pipeline( lambda { print 3 } ) |
190
|
|
|
|
|
|
|
; |
191
|
|
|
|
|
|
|
&tails(); |
192
|
|
|
|
|
|
|
}-> wait; |
193
|
|
|
|
|
|
|
# prints 123 guaranteedly in that order, even if intermediate lambdas sleep etc |
194
|
|
|
|
|
|
|
|
195
|
|
|
|
|
|
|
=head1 API |
196
|
|
|
|
|
|
|
|
197
|
|
|
|
|
|
|
=over |
198
|
|
|
|
|
|
|
|
199
|
|
|
|
|
|
|
=item new |
200
|
|
|
|
|
|
|
|
201
|
|
|
|
|
|
|
The constructor creates a new free mutex. |
202
|
|
|
|
|
|
|
|
203
|
|
|
|
|
|
|
=item is_free |
204
|
|
|
|
|
|
|
|
205
|
|
|
|
|
|
|
Returns boolean flag whether the mutex is free or not. |
206
|
|
|
|
|
|
|
Opposite of L. |
207
|
|
|
|
|
|
|
|
208
|
|
|
|
|
|
|
=item is_taken |
209
|
|
|
|
|
|
|
|
210
|
|
|
|
|
|
|
Returns boolean flag whether the mutex is taken or not. |
211
|
|
|
|
|
|
|
Opposite of L. |
212
|
|
|
|
|
|
|
|
213
|
|
|
|
|
|
|
=item take |
214
|
|
|
|
|
|
|
|
215
|
|
|
|
|
|
|
Attempts to take the mutex. If the mutex is free, the operation |
216
|
|
|
|
|
|
|
is successful and true value is returned. Otherwise, the operation |
217
|
|
|
|
|
|
|
is failed and false value is returned. |
218
|
|
|
|
|
|
|
|
219
|
|
|
|
|
|
|
=item release |
220
|
|
|
|
|
|
|
|
221
|
|
|
|
|
|
|
Tries to releases the taken mutex. If there are lambdas waiting (see L) |
222
|
|
|
|
|
|
|
in the queue, the first lambda will be terminated, and thus whoever waits for |
223
|
|
|
|
|
|
|
the lambda can be notified; it will be up to the code associated with the |
224
|
|
|
|
|
|
|
waiter lambda to call C again. If there are no waiters in the queue, |
225
|
|
|
|
|
|
|
the mutex is set free. |
226
|
|
|
|
|
|
|
|
227
|
|
|
|
|
|
|
=item waiter($timeout = undef) :: () -> error |
228
|
|
|
|
|
|
|
|
229
|
|
|
|
|
|
|
Creates a new lambda, that is finished when the mutex becomes available. |
230
|
|
|
|
|
|
|
The lambda is inserted into the internal waiting queue. It takes as |
231
|
|
|
|
|
|
|
many calls to C as many lambdas are in queue, until the mutex |
232
|
|
|
|
|
|
|
becomes free. The lambda returns an error flags, which is C if |
233
|
|
|
|
|
|
|
the mutex was acquired successfully, or the error string. |
234
|
|
|
|
|
|
|
|
235
|
|
|
|
|
|
|
If C<$timeout> is defined, and by the time it is expired the mutex |
236
|
|
|
|
|
|
|
could not be obtained, the lambda is removed from the queue, and |
237
|
|
|
|
|
|
|
returned error value is 'timeout'. The mutex state is then unchanged. |
238
|
|
|
|
|
|
|
|
239
|
|
|
|
|
|
|
If C succeeds, a C call is issued. Thus, if the next |
240
|
|
|
|
|
|
|
waiter awaits for the mutex, it will be notified; otherwise the mutex |
241
|
|
|
|
|
|
|
becomes free. |
242
|
|
|
|
|
|
|
|
243
|
|
|
|
|
|
|
=item pipeline($lambda, $timeout = undef) |
244
|
|
|
|
|
|
|
|
245
|
|
|
|
|
|
|
Creates a new lambda, that wraps over C<$lambda> so that it is executed |
246
|
|
|
|
|
|
|
after mutex had been obtained. Also, as soon as C<$lambda> is finished, |
247
|
|
|
|
|
|
|
the mutex is released, thus allowing others to take it. |
248
|
|
|
|
|
|
|
|
249
|
|
|
|
|
|
|
=item remove($lambda) |
250
|
|
|
|
|
|
|
|
251
|
|
|
|
|
|
|
Internal function, do not use directly, use C<< $lambda-> terminate >> |
252
|
|
|
|
|
|
|
instead. |
253
|
|
|
|
|
|
|
|
254
|
|
|
|
|
|
|
Removes the lambda created previously by waiter() from internal queue. Note |
255
|
|
|
|
|
|
|
that after that operation the lambda will never finish by itself. |
256
|
|
|
|
|
|
|
|
257
|
|
|
|
|
|
|
=item mutex($mutex, $timeout = undef) -> error |
258
|
|
|
|
|
|
|
|
259
|
|
|
|
|
|
|
Condition wrapper over C. |
260
|
|
|
|
|
|
|
|
261
|
|
|
|
|
|
|
=back |
262
|
|
|
|
|
|
|
|
263
|
|
|
|
|
|
|
=head1 AUTHOR |
264
|
|
|
|
|
|
|
|
265
|
|
|
|
|
|
|
Dmitry Karasik, Edmitry@karasik.eu.orgE. |
266
|
|
|
|
|
|
|
|
267
|
|
|
|
|
|
|
=cut |