File Coverage

blib/lib/KeyedMutex.pm
Criterion Covered Total %
statement 72 87 82.7
branch 21 40 52.5
condition 3 9 33.3
subroutine 18 19 94.7
pod 4 5 80.0
total 118 160 73.7


line stmt bran cond sub pod time code
1             package KeyedMutex;
2              
3 4     4   87775 use strict;
  4         10  
  4         150  
4 4     4   21 use warnings;
  4         11  
  4         142  
5              
6 4     4   24 use Digest::MD5 qw/md5/;
  4         16  
  4         403  
7 4     4   4419 use IO::Socket::INET;
  4         2214203  
  4         32  
8 4     4   2824 use IO::Socket::UNIX;
  4         9  
  4         31  
9 4     4   15347 use POSIX qw/:errno_h/;
  4         40933  
  4         27  
10 4     4   7581 use Socket qw/IPPROTO_TCP TCP_NODELAY/;
  4         10  
  4         1140  
11              
12 4     4   2397 use KeyedMutex::Lock;
  4         9  
  4         256  
13              
14             package KeyedMutex;
15              
16             our $VERSION = '0.06';
17              
18             my $MSG_NOSIGNAL = 0;
19             eval {
20             $MSG_NOSIGNAL = Socket::MSG_NOSIGNAL;
21             };
22              
23 4     4   19 use constant DEFAULT_SOCKPATH => '/tmp/keyedmutexd.sock';
  4         7  
  4         235  
24 4     4   19 use constant KEY_SIZE => 16;
  4         8  
  4         5432  
25              
26             sub new {
27 6     6 0 15006332 my ($klass, $opts) = @_;
28 6   33     278 $klass = ref($klass) || $klass;
29 6   50     57 $opts ||= {};
30 6 50 50     140 my $self = bless {
31             sock => undef,
32             locked => undef,
33             auto_reconnect =>
34             defined $opts->{auto_reconnect} ? $opts->{auto_reconnect} : 1,
35             _peer => $opts->{sock} || DEFAULT_SOCKPATH,
36             }, $klass;
37 6         63 $self->_connect();
38 4         3375 $self;
39             }
40              
41             sub DESTROY {
42 6     6   909 my $self = shift;
43 6 100       68 $self->{sock}->close if $self->{sock};
44             }
45              
46             sub locked {
47 7     7 1 296 my $self = shift;
48 7         40 $self->{locked};
49             }
50              
51             sub auto_reconnect {
52 0     0 1 0 my $self = shift;
53 0 0       0 $self->{auto_reconnect} = shift if @_;
54 0         0 $self->{auto_reconnect};
55             }
56              
57             sub lock {
58 6     6 1 3236 my ($self, $key, $use_raii) = @_;
59            
60             # check state
61 6 50       124 die "already holding a lock\n" if $self->{locked};
62            
63             # send key
64 6         57 my $hashed_key = md5($key);
65 6 100       174 $self->_connect(1) unless $self->{sock};
66 6 50       640 unless ($self->_send($hashed_key, KEY_SIZE)) {
67 0         0 $self->_connect(1);
68 0 0       0 $self->_send($hashed_key, KEY_SIZE)
69             or die 'communication error';
70             }
71             # wait for response
72 6         16 my $res;
73 6         88 while ($self->{sock}->sysread($res, 1) != 1) {
74 0 0       0 if ($! != EINTR) {
75 0         0 $self->{sock}->close;
76 0         0 $self->{sock} = undef;
77 0         0 $res = 'R';
78 0         0 last;
79             }
80             }
81 6 50       2634 return unless $res eq 'O';
82 6         21 $self->{locked} = 1;
83 6 100       92 return $use_raii ? KeyedMutex::Lock->_new($self) : 1;
84             }
85              
86             sub release {
87 6     6 1 24 my ($self) = @_;
88            
89             # check state
90 6 50       28 die "not holding a lock\n" unless $self->{locked};
91            
92 6 100       24 unless ($self->_send('R', 1)) {
93 1         25 $self->{sock}->close;
94 1         86 $self->{sock} = undef;
95             }
96 6         27 $self->{locked} = undef;
97 6         33 1;
98             }
99              
100             sub _connect {
101 7     7   19 my ($self, $is_reconnect) = @_;
102            
103 7 100       27 if ($is_reconnect) {
104 1 50       5 die 'communication error' unless $self->{auto_reconnect};
105 1 50       6 if ($self->{sock}) {
106 0         0 $self->{sock}->close;
107 0         0 $self->{sock} = undef;
108             }
109             }
110            
111 7 50       2494 if ($self->{_peer} =~ /^(?:|(.*):)(\d+)$/) {
112 0   0     0 my ($host, $port) = ($1 || '127.0.0.1', $2);
113 0 0       0 $self->{sock} = IO::Socket::INET->new(
114             PeerHost => $host,
115             PeerPort => $port,
116             Proto => 'tcp',
117             ) or die 'failed to connect to keyedmutexd';
118 0 0       0 setsockopt($self->{sock}, IPPROTO_TCP, TCP_NODELAY, 1)
119             or die 'failed to set TCP_NODELAY';
120             } else {
121 7 100       176 $self->{sock} = IO::Socket::UNIX->new(
122             Type => SOCK_STREAM,
123             Peer => $self->{_peer},
124             ) or die 'failed to connect to keyedmutexd';
125             }
126             }
127              
128             sub _send {
129 12     12   33 my ($self, $data, $size) = @_;
130 12 50       50 local $SIG{PIPE} = 'IGNORE' unless $MSG_NOSIGNAL;
131 12         22 my $ret = undef;
132 12         68 eval {
133 4     4   36 no warnings;
  4         7  
  4         347  
134 12         129 $ret = $self->{sock}->send($data, $MSG_NOSIGNAL) == $size;
135             };
136 12         814 $ret;
137             }
138              
139             1;
140              
141             __END__
142              
143             =head1 NAME
144              
145             KeyedMutex - An interprocess keyed mutex
146              
147             =head1 SYNOPSIS
148              
149             # start server
150             % keyedmutexd >/dev/null &
151            
152             use KeyedMutex;
153            
154             my $km = KeyedMutex->new;
155            
156             until ($value = $cache->get($key)) {
157             if (my $lock = $km->lock($key, 1)) {
158             #locked read from DB
159             $value = get_from_db($key);
160             $cache->set($key, $value);
161             last;
162             }
163             }
164              
165             =head1 DESCRIPTION
166              
167             C<KeyedMutex> is an interprocess keyed mutex. Its intended use is to prevent sending identical requests to database servers at the same time. By using C<KeyedMutex>, only a single client would send a request to the database, and others can retrieve the result from a shared cache (namely memcached or Cache::Swifty) instead.
168              
169             =head1 THE CONSTRUCTOR
170              
171             Following parameters are recognized.
172              
173             =head2 sock
174              
175             Optional. Path to a unix domain socket or a tcp port on which C<keyedmutexd> is running. Defaults to /tmp/keyedmutexd.sock.
176              
177             =head2 auto_reconnect
178              
179             Optional. Whether or not to automatically reconnect to server on communication failure. Default is on.
180              
181             =head1 METHODS
182              
183             =head2 lock($key, [ use_raii ])
184              
185             Tries to obtain a mutex lock for given key.
186             When the use_raii flag is not set (or omitted), the method would return 1 if successful, or undef if not. If successful, the client should later on release the lock by calling C<release>. A return value undef means some other client that held the lock has released it.
187             When the use_raii flag is being set, the method would return a C<KeyedMutex::Lock> object when successful. The lock would be automatically released when the lock object is being destroyed.
188              
189             =head2 release
190              
191             Releases the lock acquired by a procedural-style lock (i.e. use_raii flag not being set).
192              
193             =head2 locked
194              
195             Returns if the object is currently holding a lock.
196              
197             =head2 auto_reconnect
198              
199             Sets or retrieves auto_reconnect flag.
200              
201             =head1 SEE ALSO
202              
203             http://labs.cybozu.co.jp/blog/kazuhoatwork/
204              
205             =head1 AUTHOR
206              
207             Copyright (c) 2007 Cybozu Labs, Inc. All rights reserved.
208              
209             written by Kazuho Oku E<lt>kazuhooku@gmail.comE<gt>
210              
211             =head1 LICENSE
212              
213             This program is free software; you can redistribute it and/or modify it under th
214             e same terms as Perl itself.
215              
216             See http://www.perl.com/perl/misc/Artistic.html
217              
218             =cut