File Coverage

blib/lib/MogileFS/Server.pm
Criterion Covered Total %
statement 156 246 63.4
branch 7 42 16.6
condition 1 14 7.1
subroutine 49 68 72.0
pod 0 2 0.0
total 213 372 57.2


line stmt bran cond sub pod time code
1             package MogileFS::Server;
2 21     21   351598 use strict;
  21         39  
  21         789  
3 21     21   90 use warnings;
  21         30  
  21         676  
4 21     21   92 use vars qw($VERSION);
  21         28  
  21         1231  
5             $VERSION = "2.72";
6              
7             =head1 NAME
8              
9             MogileFS::Server - MogileFS (distributed filesystem) server
10              
11             =head1 SYNOPSIS
12              
13             $s = MogileFS::Server->server;
14             $s->run;
15              
16             =cut
17              
18 21     21   10687 use IO::Socket;
  21         378853  
  21         106  
19 21     21   9240 use Symbol;
  21         37  
  21         1211  
20 21     21   10817 use POSIX;
  21         116760  
  21         134  
21 21     21   58429 use File::Copy ();
  21         77795  
  21         583  
22 21     21   134 use Carp;
  21         25  
  21         1521  
23 21     21   99 use File::Basename ();
  21         26  
  21         228  
24 21     21   292 use File::Path ();
  21         26  
  21         204  
25 21     21   13437 use Sys::Syslog ();
  21         150335  
  21         581  
26 21     21   346929 use Time::HiRes ();
  21         23553  
  21         563  
27 21     21   10535 use Net::Netmask;
  21         102266  
  21         2425  
28 21     21   195 use List::Util;
  21         34  
  21         1274  
29 21     21   99 use Socket qw(SO_KEEPALIVE IPPROTO_TCP TCP_NODELAY);
  21         29  
  21         1457  
30              
31 21     21   9309 use MogileFS::Util qw(daemonize);
  21         48  
  21         1565  
32 21     21   8466 use MogileFS::Config;
  21         64  
  21         3068  
33              
34 21     21   199 use MogileFS::ProcManager;
  21         41  
  21         561  
35 21     21   148 use MogileFS::Connection::Client;
  21         34  
  21         374  
36 21     21   82 use MogileFS::Connection::Worker;
  21         27  
  21         379  
37              
38 21     21   12985 use MogileFS::Worker::Query;
  21         99  
  21         768  
39 21     21   10683 use MogileFS::Worker::Delete;
  21         58  
  21         676  
40 21     21   9943 use MogileFS::Worker::Replicate;
  21         72  
  21         757  
41 21     21   9378 use MogileFS::Worker::Reaper;
  21         55  
  21         518  
42 21     21   9019 use MogileFS::Worker::Monitor;
  21         64  
  21         697  
43 21     21   9700 use MogileFS::Worker::Fsck;
  21         57  
  21         577  
44 21     21   8988 use MogileFS::Worker::JobMaster;
  21         53  
  21         589  
45              
46 21     21   7994 use MogileFS::Factory::Domain;
  21         49  
  21         491  
47 21     21   6967 use MogileFS::Factory::Class;
  21         53  
  21         574  
48 21     21   7914 use MogileFS::Factory::Host;
  21         68  
  21         670  
49 21     21   8517 use MogileFS::Factory::Device;
  21         56  
  21         614  
50 21     21   154 use MogileFS::Domain;
  21         31  
  21         470  
51 21     21   98 use MogileFS::Class;
  21         32  
  21         382  
52 21     21   89 use MogileFS::Host;
  21         34  
  21         327  
53 21     21   80 use MogileFS::Device;
  21         29  
  21         306  
54              
55 21     21   87 use MogileFS::HTTPFile;
  21         32  
  21         390  
56 21     21   8645 use MogileFS::FID;
  21         54  
  21         556  
57 21     21   7665 use MogileFS::DevFID;
  21         46  
  21         548  
58              
59 21     21   126 use MogileFS::Store;
  21         37  
  21         391  
60              
61 21     21   7550 use MogileFS::ReplicationPolicy::MultipleHosts;
  21         51  
  21         12812  
62              
63             my $server; # server singleton
64             sub server {
65 0     0 0 0 my ($pkg) = @_;
66 0   0     0 return $server ||= bless {}, $pkg;
67             }
68              
69             # --------------------------------------------------------------------------
70             # instance methods:
71             # --------------------------------------------------------------------------
72              
73             sub run {
74 0     0 0 0 my $self = shift;
75              
76 0         0 MogileFS::Config->load_config;
77              
78             # don't run as root
79 0 0 0     0 die "mogilefsd cannot be run as root\n"
80             if $< == 0 && MogileFS->config('user') ne "root";
81              
82 0         0 MogileFS::Config->check_database;
83 0 0       0 daemonize() if MogileFS->config("daemonize");
84              
85 0         0 MogileFS::ProcManager->set_min_workers('monitor' => 1);
86              
87             # open up our log
88 0         0 Sys::Syslog::openlog('mogilefsd', 'pid', 'daemon');
89 0         0 Mgd::log('info', 'beginning run');
90              
91 0 0       0 unless (MogileFS::ProcManager->write_pidfile) {
92 0         0 Mgd::log('info', "Couldn't write pidfile, ending run");
93 0         0 Sys::Syslog::closelog();
94 0         0 exit 1;
95             }
96              
97             # Install signal handlers.
98             $SIG{TERM} = sub {
99 0     0   0 my @children = MogileFS::ProcManager->child_pids;
100 0 0       0 print STDERR scalar @children, " children to kill.\n" if $DEBUG;
101 0         0 my $count = kill( 'TERM' => @children );
102 0 0       0 print STDERR "Sent SIGTERM to $count children.\n" if $DEBUG;
103 0         0 MogileFS::ProcManager->remove_pidfile;
104 0         0 Mgd::log('info', 'ending run due to SIGTERM');
105 0         0 Sys::Syslog::closelog();
106              
107 0         0 exit 0;
108 0         0 };
109              
110             $SIG{INT} = sub {
111 0     0   0 my @children = MogileFS::ProcManager->child_pids;
112 0 0       0 print STDERR scalar @children, " children to kill.\n" if $DEBUG;
113 0         0 my $count = kill( 'INT' => @children );
114 0 0       0 print STDERR "Sent SIGINT to $count children.\n" if $DEBUG;
115 0         0 MogileFS::ProcManager->remove_pidfile;
116 0         0 Mgd::log('info', 'ending run due to SIGINT');
117 0         0 exit 0;
118 0         0 };
119 0         0 $SIG{PIPE} = 'IGNORE'; # catch them by hand
120              
121             # setup server sockets to listen for client connections
122 0         0 my @servers;
123 0         0 foreach my $listen (@{ MogileFS->config('listen') }) {
  0         0  
124 0 0       0 my $server = IO::Socket::INET->new(LocalAddr => $listen,
125             Type => SOCK_STREAM,
126             Proto => 'tcp',
127             Blocking => 0,
128             Reuse => 1,
129             Listen => 1024 )
130             or die "Error creating socket: $@\n";
131 0         0 $server->sockopt(SO_KEEPALIVE, 1);
132 0         0 $server->setsockopt(IPPROTO_TCP, TCP_NODELAY, 1);
133              
134             # save sub to accept a client
135 0         0 push @servers, $server;
136             Danga::Socket->AddOtherFds( fileno($server) => sub {
137 0     0   0 while (my $csock = $server->accept) {
138 0         0 MogileFS::Connection::Client->new($csock);
139             }
140 0         0 } );
141             }
142              
143             MogileFS::ProcManager->push_pre_fork_cleanup(sub {
144             # so children don't hold server connection open
145 0     0   0 close($_) foreach @servers;
146 0         0 });
147              
148             # setup the post event loop callback to spawn jobs, and the timeout
149 0         0 Danga::Socket->DebugLevel(3);
150 0         0 Danga::Socket->SetLoopTimeout( 250 ); # 250 milliseconds
151 0         0 Danga::Socket->SetPostLoopCallback(MogileFS::ProcManager->PostEventLoopChecker);
152              
153             # and now, actually start listening for events
154 0         0 eval {
155 0 0       0 print( "Starting event loop for frontend job on pid $$.\n" ) if $DEBUG;
156 0         0 Danga::Socket->EventLoop();
157             };
158              
159 0 0       0 if ($@) {
160 0         0 Mgd::log('err', "crash log: $@");
161 0         0 exit 1;
162             }
163 0         0 Mgd::log('info', 'ending run');
164 0         0 Sys::Syslog::closelog();
165 0         0 exit(0);
166             }
167              
168             # --------------------------------------------------------------------------
169              
170             package MogileFS;
171             # just so MogileFS->config($key) will work:
172 21     21   135 use MogileFS::Config qw(config);
  21         37  
  21         3273  
173              
174             my %hooks;
175              
176             sub register_worker_command {
177             # just pass this through to the Worker class
178 0     0   0 return MogileFS::Worker::Query::register_command(@_);
179             }
180              
181             sub register_global_hook {
182 0     0   0 $hooks{$_[0]} = $_[1];
183 0         0 return 1;
184             }
185              
186             sub unregister_global_hook {
187 0     0   0 delete $hooks{$_[0]};
188 0         0 return 1;
189             }
190              
191             sub run_global_hook {
192 0     0   0 my $hookname = shift;
193 0         0 my $ref = $hooks{$hookname};
194 0 0       0 return $ref->(@_) if defined $ref;
195 0         0 return undef;
196             }
197              
198             # --------------------------------------------------------------------------
199              
200             package Mgd; # conveniently short name
201 21     21   101 use strict;
  21         31  
  21         571  
202 21     21   87 use warnings;
  21         174  
  21         612  
203 21     21   87 use MogileFS::Config;
  21         28  
  21         2568  
204 21     21   130 use MogileFS::Util qw(error fatal debug); # for others calling Mgd::foo()
  21         32  
  21         951  
205 21     21   104 use Socket qw(SOL_SOCKET SO_RCVBUF AF_UNIX SOCK_STREAM PF_UNSPEC);
  21         31  
  21         2816  
206             BEGIN {
207             # detect the receive buffer size for Unix domain stream sockets,
208             # we assume the size is identical across all Unix domain sockets.
209 21 50   21   1081 socketpair(my $s1, my $s2, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
210             or die( "socketpair failed: $!" );
211              
212 21         157 my $r = getsockopt($s1, SOL_SOCKET, SO_RCVBUF);
213 21 50       72 defined $r or die "getsockopt: $!";
214 21 50       144 $r = unpack('i', $r) if defined $r;
215 21 50 33     144 $r = (defined $r && $r > 0) ? $r : 8192;
216 21         2870 close $s1;
217 21         165 close $s2;
218 21     21   1458 eval 'use constant UNIX_RCVBUF_SIZE => $r';
  21         105  
  21         26  
  21         8288  
219             }
220              
221             sub server {
222 0     0   0 return MogileFS::Server->server;
223             }
224              
225             # database checking/connecting
226             sub validate_dbh {
227 0     0   0 my $sto = Mgd::get_store();
228 0         0 my $had_dbh = $sto->have_dbh;
229 0         0 $sto->recheck_dbh();
230 0         0 my $dbh;
231 0         0 eval { $dbh = $sto->dbh };
  0         0  
232             # Doesn't matter what the failure was; workers should retry later.
233 0 0 0     0 error("Error validating master DB: $@") if $@ && $had_dbh;
234 0         0 return $dbh;
235             }
236              
237             # the eventual replacement for callers asking for a dbh directly:
238             # they'll ask for the current store, which is a database abstraction
239             # layer.
240             my ($store, $store_pid);
241             sub get_store {
242 0 0 0 0   0 return $store if $store && $store_pid == $$;
243 0         0 $store_pid = $$;
244 0         0 return $store = MogileFS::Store->new;
245             }
246              
247             sub close_store {
248 0 0   0   0 if ($store) {
249 0         0 $store->dbh->disconnect();
250 0         0 $store = undef;
251 0         0 return 1;
252             }
253 0         0 return 0;
254             }
255              
256             # only for t/ scripts to explicitly set a store, without loading in a config
257             sub set_store {
258 0     0   0 my ($s) = @_;
259 0         0 $store = $s;
260 0         0 $store_pid = $$;
261             }
262              
263             sub domain_factory {
264 0     0   0 return MogileFS::Factory::Domain->get_factory;
265             }
266              
267             sub class_factory {
268 0     0   0 return MogileFS::Factory::Class->get_factory;
269             }
270              
271             sub host_factory {
272 0     0   0 return MogileFS::Factory::Host->get_factory;
273             }
274              
275             sub device_factory {
276 0     0   0 return MogileFS::Factory::Device->get_factory;
277             }
278              
279             # log stuff to syslog or the screen
280             sub log {
281             # simple logging functionality
282 4 50   4   22 if (! $MogileFS::Config::daemonize) {
283 4         22 $| = 1;
284             # syslog acts like printf so we have to use printf and append a \n
285 4         12 shift; # ignore the first parameter (info, warn, critical, etc)
286 4         5 my $mask = shift; # format string
287 4 50       32 $mask .= "\n" unless $mask =~ /\n$/;
288 4 50       19 my $message = @_ ? sprintf($mask, @_) : $mask;
289 4         1830 print '[', scalar localtime(), '] ', $message;
290             } else {
291             # just pass the parameters to syslog
292 0           Sys::Syslog::syslog(@_);
293             }
294             }
295              
296             1;
297             __END__