File Coverage

blib/lib/Memcached/Server.pm
Criterion Covered Total %
statement 144 175 82.2
branch 88 128 68.7
condition 22 36 61.1
subroutine 21 28 75.0
pod 5 5 100.0
total 280 372 75.2


line stmt bran cond sub pod time code
1             package Memcached::Server;
2              
3 3     3   53345 use warnings;
  3         8  
  3         95  
4 3     3   18 use strict;
  3         8  
  3         168  
5              
6             =head1 NAME
7              
8             Memcached::Server - A pure perl Memcached server helper, that help you create a server speaking Memcached protocol
9              
10             =head1 VERSION
11              
12             Version 0.04
13              
14             =cut
15              
16             our $VERSION = '0.04';
17              
18 3     3   3607 use AnyEvent::Socket;
  3         147054  
  3         537  
19 3     3   5049 use AnyEvent::Handle;
  3         29882  
  3         145  
20 3     3   2956 use Hash::Identity qw(e);
  3         4906  
  3         20  
21 3     3   2382 use callee;
  3         15798  
  3         4744  
22              
23             =head1 SYNOPSIS
24              
25             # running as a stand alone server
26             use Memcached::Server;
27             my $server = Memcached::Server->new(
28             no_extra => 0 / 1, # if set to true, then the server will skip cas, expire, flag;
29             # thus, cas always success, never expire, flag remains 0 forever.
30             # with this option on, one can get a entry that hasn't been set,
31             # as long as your 'get' and '_find' say yes.
32             open => [[0, 8888], ['127.0.0.1', 8889], ['10.0.0.5', 8889], [$host, $port], ...],
33             cmd => { # customizable handlers
34             _find => sub {
35             my($cb, $key, $client) = @_;
36             ...
37             $cb->(0); # not found
38             ... or ...
39             $cb->(1); # found
40             },
41             get => sub {
42             my($cb, $key, $client) = @_;
43             ...
44             $cb->(0); # not found
45             ... or ...
46             $cb->(1, $data); # found
47             },
48             set => sub {
49             my($cb, $key, $flag, $expire, $value, $client) = @_;
50             ...
51             $cb->(1); # success
52             ... or ...
53             $cb->(-1, $error_message); # error occured, but keep the connection to accept next commands.
54             ... or ...
55             $cb->(-2, $error_message); # error occured, and close the connection immediately.
56             },
57             delete => sub {
58             my($cb, $key, $client) = @_;
59             ...
60             $cb->(0); # not found
61             ... or ...
62             $cb->(1); # success
63             },
64             flush_all => sub {
65             my($cb, $client) = @_;
66             ...
67             $cb->();
68             },
69             _begin => sub { # called when a client is accepted or assigned by 'serve' method (optional)
70             my($cb, $client) = @_;
71             ...
72             $cb->();
73             },
74             _end => sub { # called when a client disconnects (optional)
75             my($cb, $client) = @_;
76             ...
77             $cb->();
78             },
79             # NOTE: the $client, a AnyEvent::Handle object, is presented for keeping per connection information by using it as a hash key.
80             # it's not recommended to read or write to this object directly, that might break the protocol consistency.
81             }
82             );
83             ...
84             $server->open($host, $port); # open one more listening address
85             $server->close($host, $port); # close a listening address
86             $server->close_all; # close all the listening addresses
87             $server->serve($file_handle); # assign an accepted client socket to the server manually
88              
89             =head1 DESCRIPTION
90              
91             This module help us to create a pure perl Memcached server.
92             It take care some protocol stuff, so that we can only focus on primary functions.
93              
94             Take a look on the source of L, a compelete example that
95             works as a standard Memcached server, except it's pure perl implemented.
96              
97             =head1 SUBROUTINES/METHODS
98              
99             =head2 $server = Memcached::Server->new( cmd => ..., open => ... );
100              
101             Create a Memcached::Server with parameters 'cmd' (required) and 'open' (optional).
102              
103             The parameter 'cmd' is provided to control the behaviors, that should be prepared
104             and assigned at the initial time.
105              
106             The parameter 'open' is provided to assign a list of listening hosts/ports.
107             Each of the list is passed to L directly,
108             so you can use IPv4, IPv6, and also unix sockets.
109              
110             If you don't provide 'open' here, you can provide it later by member method 'open'.
111              
112             =cut
113              
114             sub new {
115 3     3 1 51 my $self = bless {
116             open => [],
117             cas => 0,
118             extra_data => {},
119             }, shift;
120 3         13 while( @_ ) {
121 7         4820 my $key = shift;
122 7 100       27 if( $key eq 'cmd' ) {
    100          
    50          
123 3         18 $self->{cmd} = shift;
124             }
125             elsif( $key eq 'open' ) {
126 3         5 $self->open(@$_) for @{shift()};
  3         17  
127             }
128             elsif( $key eq 'no_extra' ) {
129 1         5 $self->{no_extra} = shift;
130             }
131             }
132 3         5933 return $self;
133             }
134              
135             =head2 $server->serve($fh)
136              
137             Assign an accepted client socket to the server.
138             Instead of accepting and serving clients on centain listening port automatically,
139             you can also serve clients manually by this method.
140              
141             =cut
142              
143             sub serve {
144 4     4 1 9 my($self, $fh) = @_;
145              
146 4         7 my $client;
147             $client = AnyEvent::Handle->new(
148             fh => $fh,
149             on_error => sub {
150             $self->_end( sub {
151 0         0 undef $client;
152 0     0   0 } );
153             },
154 4         40 );
155             $self->_begin( sub {
156             $client->push_read( line => sub {
157             #$client->push_write("line: $_[1]\n");
158 46 100       112422 if( my($cmd, $key, $flag, $expire, $size, $cas, $noreply) = $_[1] =~ /^ *(set|add|replace|append|prepend|cas) +([^ ]+) +(\d+) +(\d+) +(\d+)(?: +(\d+))?( +noreply)? *$/ ) {
    100          
    100          
    100          
    50          
    50          
    0          
    0          
    0          
159             $client->unshift_read( chunk => $size, sub {
160 18         1704 my $data_ref = \$_[1];
161             $client->unshift_read( line => sub {
162 18 50       1068 if( $_[1] eq '' ) {
163 18 100       118 if( $cmd eq 'set' ) {
    100          
    100          
    100          
    100          
    50          
164 4         20 $self->_set($client, $noreply, $key, $flag, $expire, $$data_ref);
165             }
166             elsif( $cmd eq 'add' ) {
167             $self->_find( sub {
168 2 100       7 if( $_[0] ) {
169 1 50       6 $client->push_write("NOT_STORED\r\n") unless $noreply;
170             }
171             else {
172 1         5 $self->_set($client, $noreply, $key, $flag, $expire, $$data_ref);
173             }
174 2         17 }, $key, $client );
175             }
176             elsif( $cmd eq 'replace' ) {
177             $self->_find( sub {
178 2 100       6 if( $_[0] ) {
179 1         6 $self->_set($client, $noreply, $key, $flag, $expire, $$data_ref);
180             }
181             else {
182 1 50       14 $client->push_write("NOT_STORED\r\n") unless $noreply;
183             }
184 2         12 }, $key, $client );
185             }
186             elsif( $cmd eq 'cas' ) {
187             $self->_find( sub {
188 6 100       17 if( $_[0] ) {
189 4 100 100     27 if( $self->{no_extra} || $self->{extra_data}{$key}[2]==$cas ) {
190 3         142 $self->_set($client, $noreply, $key, $flag, $expire, $$data_ref);
191             }
192             else {
193 1 50       8 $client->push_write("EXISTS\r\n") unless $noreply;
194             }
195             }
196             else {
197 2 50       16 $client->push_write("NOT_FOUND\r\n") unless $noreply;
198             }
199 6         74 }, $key, $client );
200             }
201             elsif( $cmd eq 'prepend' ) {
202             $self->_get( sub {
203 2 100       8 if( $_[0] ) {
204 1         12 $self->_set($client, $noreply, $key, -1, -1, "$$data_ref$_[1]");
205             }
206             else {
207 1 50       7 $client->push_write("NOT_STORED\r\n") unless $noreply;
208             }
209 2         13 }, $key, $client );
210             }
211             elsif( $cmd eq 'append' ) {
212             $self->_get( sub {
213 2 100       7 if( $_[0] ) {
214 1         9 $self->_set($client, $noreply, $key, -1, -1, "$_[1]$$data_ref");
215             }
216             else {
217 1 50       11 $client->push_write("NOT_STORED\r\n") unless $noreply;
218             }
219 2         14 }, $key, $client );
220             }
221             }
222             else {
223 0 0       0 $client->push_write("CLIENT_ERROR bad data chunk\r\n") unless $noreply;
224 0         0 $client->push_write("ERROR\r\n");
225             }
226 18         136 } );
227 18         200 } );
228             }
229             elsif( $_[1] =~ /^ *(gets?) +([^ ].*) *$/ ) {
230 19         668 my($cmd, $keys) = ($1, $2);
231 19         36 my $n = 0;
232 19         29 my $curr = 0;
233 19         34 my @status;
234             my @data;
235 0         0 my $end;
236 19         690 while( $keys =~ /([^ ]+)/g ) {
237 21         48 my $key = $1;
238 21         36 my $i = $n++;
239             $self->_get( sub {
240 21         136 $status[$i-$curr] = $_[0];
241 21         69 $data[$i-$curr] = $_[1];
242 21   66     209 while( $curr<$n && defined $status[0] ) {
243 21 100       592 if( shift @status ) {
244 15 100       206 $client->push_write("VALUE $key $e{ $self->{no_extra} ? 0 : $self->{extra_data}{$key}[1] } $e{length $data[0]}");
245 15 100       2119 $client->push_write(" $e{ $self->{no_extra} ? 0 : $self->{extra_data}{$key}[2] }") if( $cmd eq 'gets' );
    100          
246 15         389 $client->push_write("\r\n");
247 15         4545 $client->push_write($data[0]);
248 15         1202 $client->push_write("\r\n");
249 15         1992 shift @data;
250             }
251 21         100 ++$curr;
252             }
253 21 50 33     472 $client->push_write("END\r\n") if( $end && $curr==$n );
254 21         297 }, $key, $client );
255             }
256 19 50       55 if( $curr==$n ) {
257 19         239 $client->push_write("END\r\n");
258             }
259             else {
260 0         0 $end = 1;
261             }
262             }
263             elsif( ($key, $noreply) = $_[1] =~ /^ *delete +([^ ]+)( +noreply)? *$/ ) {
264             $self->_delete( sub {
265 2 50       10 if( !$noreply ) {
266 2 100       13 if( $_[0] ) {
267 1         5 $client->push_write("DELETED\r\n");
268             }
269             else {
270 1         5 $client->push_write("NOT_FOUND\r\n");
271             }
272             }
273 2         19 }, $key, $client );
274             }
275             elsif( ($cmd, $key, my $val, $noreply) = $_[1] =~ /^ *(incr|decr) +([^ ]+) +(\d+)( +noreply)? *$/ ) {
276             $self->_get( sub {
277 5 100       16 if( $_[0] ) {
278 3 100       10 if( $cmd eq 'incr' ) {
279 3     3   37 no warnings 'numeric';
  3         6  
  3         210  
280 1         7 $val = $_[1] + $val;
281             }
282             else {
283 3     3   15 no warnings 'numeric';
  3         6  
  3         5983  
284 2         8 $val = $_[1] - $val;
285 2 100       10 $val = 0 if $val<0;
286             }
287 3 50       18 $self->_set($client, sub { $client->push_write("$val\r\n") unless $noreply }, $key, -1, -1, $val);
  3         34  
288             }
289             else {
290 2 50       175 $client->push_write("NOT_FOUND\r\n") unless $noreply;
291             }
292 5         39 }, $key, $client );
293             }
294             elsif( $_[1] =~ /^ *stats *$/ ) {
295 0         0 $client->push_write("END\r\n");
296             }
297             elsif( ($noreply) = $_[1] =~ /^ *flush_all( +noreply)? *$/ ) {
298             $self->_flush_all( sub {
299 2 50       14 $client->push_write("OK\r\n") unless $noreply;
300 2         17 } );
301             }
302             elsif( $_[1] =~ /^ *verbosity( +noreply)? *$/ ) {
303 0 0       0 $client->push_write("OK\r\n") if !$1;
304             }
305             elsif( $_[1] =~ /^ *version *$/ ) {
306 0         0 $client->push_write("VERSION 1.4.4\r\n");
307             }
308             elsif( $_[1] =~ /^ *quit *$/ ) {
309 0         0 $client->push_shutdown;
310             }
311             else {
312 0         0 $client->push_write("ERROR\r\n");
313             }
314 46         8236 $client->push_read( line => callee );
315 4     4   80 } );
316 4         248 }, $client );
317             }
318              
319             =head2 $server->open( host, port )
320              
321             Functions like the 'open' parameter of the 'new' method.
322             The 'new' method will put each element of the 'open' parameter to this method indeed.
323              
324             =cut
325              
326             sub open {
327 3     3 1 5 my $self = shift;
328 3     4   5 push @{$self->{open}}, [ $_[0], $_[1], tcp_server( $_[0], $_[1], sub { $self->serve($_[0]) } ) ];
  3         35  
  4         9382  
329             }
330              
331             =head2 $server->close( host, port )
332              
333             Close and stop listening to certain host-port.
334              
335             =cut
336              
337             sub close {
338 0     0 1 0 my $self = shift;
339 0 0       0 $self->{open} = [ grep { $_->[0] ne $_[0] or $_->[1] != $_[1] } @{$self->{open}} ];
  0         0  
  0         0  
340             }
341              
342             =head2 $server->close_all
343              
344             Close all the listening host-port.
345              
346             =cut
347              
348             sub close_all {
349 0     0 1 0 my $self = shift;
350 0         0 $self->{open} = [];
351             }
352              
353             =head2 $server->_set, $server->_find, $server->_get, $server->_delete, $server->_flush_all, $server->_begin, $server->_end
354              
355             These methods are the main function methods used by the server.
356             They should be used or overrided when you implementing your own server and you want
357             to do something SPECIAL. Please read the source for better understanding.
358              
359             =cut
360              
361             sub _set {
362 14     14   46 my($self, $client, $noreply, $key, $flag, $expire) = @_;
363             $self->{cmd}{set}( sub {
364 14     14   38 my($status, $msg) = @_;
365 14 50       40 if( $status==1 ) {
    0          
    0          
366 14 50 33     60 $expire += time if $expire>0 && $expire<=2592000;
367 14 100       66 if( !$self->{no_extra} ) {
368 11 100       27 if( $expire<0 ) {
369 5         17 $self->{extra_data}{$key}[2] = ++$self->{cas};
370             }
371             else {
372 6         276 $self->{extra_data}{$key} = [$expire, $flag, ++$self->{cas}];
373             }
374             }
375 14 100       74 $client->push_write("STORED\r\n") unless $noreply;
376 14 100       3293 $noreply->() if ref($noreply) eq 'CODE';
377             }
378             elsif( $status==-1 ) {
379 0         0 $client->push_write("SERVER_ERROR $msg\r\n");
380             }
381             elsif( $status==-2 ) {
382 0         0 $client->push_write("SERVER_ERROR $msg\r\n", 1);
383 0         0 $client->push_shutdown;
384             }
385             else {
386 0         0 warn "Unknown 'set' callback status: $status";
387 0         0 $client->push_write("SERVER_ERROR $msg\r\n");
388             }
389 14         196 }, $key, $flag, $expire, $_[6], $client);
390             }
391              
392             sub _find {
393 10     10   27 my($self, $cb, $key, $client) = @_;
394 10 100 100     74 if( $self->{no_extra} || exists $self->{extra_data}{$key} ) {
395 7 50 66     48 if( $self->{no_extra} || !$self->{extra_data}{$key}[0] || $self->{extra_data}{$key}[0]>time ) {
      33        
396 7         43 $self->{cmd}{_find}($cb, $key, $client);
397             }
398             else {
399 0     0   0 $self->_delete( sub { $cb->(0) }, $key, $client );
  0         0  
400             }
401             }
402             else {
403 3         22 $cb->(0);
404             }
405             }
406              
407             sub _get {
408 30     30   76 my($self, $cb, $key, $client) = @_;
409 30 100 100     214 if( $self->{no_extra} || exists $self->{extra_data}{$key} ) {
410 22 50 66     161 if( $self->{no_extra} || !$self->{extra_data}{$key}[0] || $self->{extra_data}{$key}[0]>time ) {
      33        
411 22         595 $self->{cmd}{get}->($cb, $key, $client);
412             }
413             else {
414 0     0   0 $self->_delete( sub { $cb->(0) }, $key, $client );
  0         0  
415             }
416             }
417             else {
418 8         26 $cb->(0);
419             }
420             }
421              
422             sub _delete {
423 2     2   8 my($self, $cb, $key, $client) = @_;
424 2 100 66     19 if( $self->{no_extra} || exists $self->{extra_data}{$key} ) {
425 1         4 my $extra_data = delete $self->{extra_data}{$key};
426 1 50 33 0   17 $self->{cmd}{delete}->( !$extra_data->[0] || $extra_data->[0]>time ? $cb : sub { $cb->(0) }, $key, $client );
  0         0  
427             }
428             else {
429 1         5 $cb->(0);
430             }
431             }
432              
433             sub _flush_all {
434 2     2   6 my($self, $cb, $client) = @_;
435             $self->{cmd}{flush_all}->( sub {
436 2     2   7 $self->{extra_data} = {};
437 2         11 $cb->();
438 2         19 }, $client );
439             }
440              
441             sub _begin {
442 4     4   9 my($self, $cb, $client) = @_;
443 4 100       16 if( exists $self->{cmd}{_begin} ) {
444 2         8 $self->{cmd}{_begin}->($cb, $client);
445             }
446             else {
447 2         5 $cb->();
448             }
449             }
450              
451             sub _end {
452 0     0     my($self, $cb, $client) = @_;
453 0 0         if( exists $self->{cmd}{_end} ) {
454 0           $self->{cmd}{_end}->($cb, $client);
455             }
456             else {
457 0           $cb->();
458             }
459             }
460              
461             =head1 SEE ALSO
462              
463             L, L, L
464              
465             =head1 AUTHOR
466              
467             Cindy Wang (CindyLinz)
468              
469             =head1 BUGS
470              
471             Please report any bugs or feature requests to C, or through
472             the web interface at L. I will be notified, and then you'll
473             automatically be notified of progress on your bug as I make changes.
474              
475             =head1 SUPPORT
476              
477             You can find documentation for this module with the perldoc command.
478              
479             perldoc Memcached::Server
480              
481              
482             You can also look for information at:
483              
484             =over 4
485              
486             =item * RT: CPAN's request tracker
487              
488             L
489              
490             =item * AnnoCPAN: Annotated CPAN documentation
491              
492             L
493              
494             =item * CPAN Ratings
495              
496             L
497              
498             =item * Search CPAN
499              
500             L
501              
502             =back
503              
504             =head1 LICENSE AND COPYRIGHT
505              
506             Copyright 2010 Cindy Wang (CindyLinz).
507              
508             This program is free software; you can redistribute it and/or modify it
509             under the terms of either: the GNU General Public License as published
510             by the Free Software Foundation; or the Artistic License.
511              
512             See http://dev.perl.org/licenses/ for more information.
513              
514              
515             =cut
516              
517             1; # End of Memcached::Server