File Coverage

blib/lib/MogileFS/Server.pm
Criterion Covered Total %
statement 156 246 63.4
branch 7 42 16.6
condition 1 14 7.1
subroutine 49 68 72.0
pod 0 2 0.0
total 213 372 57.2


line stmt bran cond sub pod time code
1             package MogileFS::Server;
2 21     21   1054895 use strict;
  21         151  
  21         488  
3 21     21   88 use warnings;
  21         28  
  21         525  
4 21     21   102 use vars qw($VERSION);
  21         32  
  21         1118  
5             $VERSION = "2.73";
6              
7             =head1 NAME
8              
9             MogileFS::Server - MogileFS (distributed filesystem) server
10              
11             =head1 SYNOPSIS
12              
13             $s = MogileFS::Server->server;
14             $s->run;
15              
16             =cut
17              
18 21     21   8081 use IO::Socket;
  21         340812  
  21         64  
19 21     21   7016 use Symbol;
  21         52  
  21         858  
20 21     21   8419 use POSIX;
  21         101587  
  21         105  
21 21     21   53131 use File::Copy ();
  21         73722  
  21         496  
22 21     21   109 use Carp;
  21         35  
  21         951  
23 21     21   117 use File::Basename ();
  21         35  
  21         255  
24 21     21   92 use File::Path ();
  21         34  
  21         248  
25 21     21   9765 use Sys::Syslog ();
  21         139361  
  21         442  
26 21     21   6642 use Time::HiRes ();
  21         16992  
  21         430  
27 21     21   7875 use Net::Netmask;
  21         97662  
  21         1353  
28 21     21   124 use List::Util;
  21         43  
  21         1191  
29 21     21   111 use Socket qw(SO_KEEPALIVE IPPROTO_TCP TCP_NODELAY);
  21         35  
  21         1090  
30              
31 21     21   7022 use MogileFS::Util qw(daemonize);
  21         43  
  21         1067  
32 21     21   7339 use MogileFS::Config;
  21         49  
  21         1981  
33              
34 21     21   120 use MogileFS::ProcManager;
  21         35  
  21         378  
35 21     21   132 use MogileFS::Connection::Client;
  21         50  
  21         401  
36 21     21   128 use MogileFS::Connection::Worker;
  21         64  
  21         365  
37              
38 21     21   10445 use MogileFS::Worker::Query;
  21         47  
  21         552  
39 21     21   7247 use MogileFS::Worker::Delete;
  21         45  
  21         490  
40 21     21   8588 use MogileFS::Worker::Replicate;
  21         50  
  21         539  
41 21     21   6895 use MogileFS::Worker::Reaper;
  21         55  
  21         497  
42 21     21   8213 use MogileFS::Worker::Monitor;
  21         49  
  21         506  
43 21     21   7658 use MogileFS::Worker::Fsck;
  21         46  
  21         458  
44 21     21   7202 use MogileFS::Worker::JobMaster;
  21         47  
  21         476  
45              
46 21     21   6524 use MogileFS::Factory::Domain;
  21         43  
  21         460  
47 21     21   6517 use MogileFS::Factory::Class;
  21         44  
  21         531  
48 21     21   6609 use MogileFS::Factory::Host;
  21         52  
  21         542  
49 21     21   6582 use MogileFS::Factory::Device;
  21         53  
  21         484  
50 21     21   117 use MogileFS::Domain;
  21         35  
  21         358  
51 21     21   89 use MogileFS::Class;
  21         37  
  21         337  
52 21     21   83 use MogileFS::Host;
  21         34  
  21         297  
53 21     21   83 use MogileFS::Device;
  21         34  
  21         303  
54              
55 21     21   89 use MogileFS::HTTPFile;
  21         39  
  21         339  
56 21     21   6880 use MogileFS::FID;
  21         49  
  21         515  
57 21     21   6727 use MogileFS::DevFID;
  21         45  
  21         485  
58              
59 21     21   170 use MogileFS::Store;
  21         39  
  21         2007  
60              
61 21     21   6848 use MogileFS::ReplicationPolicy::MultipleHosts;
  21         45  
  21         12202  
62              
63             my $server; # server singleton
64             sub server {
65 0     0 0 0 my ($pkg) = @_;
66 0   0     0 return $server ||= bless {}, $pkg;
67             }
68              
69             # --------------------------------------------------------------------------
70             # instance methods:
71             # --------------------------------------------------------------------------
72              
73             sub run {
74 0     0 0 0 my $self = shift;
75              
76 0         0 MogileFS::Config->load_config;
77              
78             # don't run as root
79 0 0 0     0 die "mogilefsd cannot be run as root\n"
80             if $< == 0 && MogileFS->config('user') ne "root";
81              
82 0         0 MogileFS::Config->check_database;
83 0 0       0 daemonize() if MogileFS->config("daemonize");
84              
85 0         0 MogileFS::ProcManager->set_min_workers('monitor' => 1);
86              
87             # open up our log
88 0         0 Sys::Syslog::openlog('mogilefsd', 'pid', 'daemon');
89 0         0 Mgd::log('info', 'beginning run');
90              
91 0 0       0 unless (MogileFS::ProcManager->write_pidfile) {
92 0         0 Mgd::log('info', "Couldn't write pidfile, ending run");
93 0         0 Sys::Syslog::closelog();
94 0         0 exit 1;
95             }
96              
97             # Install signal handlers.
98             $SIG{TERM} = sub {
99 0     0   0 my @children = MogileFS::ProcManager->child_pids;
100 0 0       0 print STDERR scalar @children, " children to kill.\n" if $DEBUG;
101 0         0 my $count = kill( 'TERM' => @children );
102 0 0       0 print STDERR "Sent SIGTERM to $count children.\n" if $DEBUG;
103 0         0 MogileFS::ProcManager->remove_pidfile;
104 0         0 Mgd::log('info', 'ending run due to SIGTERM');
105 0         0 Sys::Syslog::closelog();
106              
107 0         0 exit 0;
108 0         0 };
109              
110             $SIG{INT} = sub {
111 0     0   0 my @children = MogileFS::ProcManager->child_pids;
112 0 0       0 print STDERR scalar @children, " children to kill.\n" if $DEBUG;
113 0         0 my $count = kill( 'INT' => @children );
114 0 0       0 print STDERR "Sent SIGINT to $count children.\n" if $DEBUG;
115 0         0 MogileFS::ProcManager->remove_pidfile;
116 0         0 Mgd::log('info', 'ending run due to SIGINT');
117 0         0 exit 0;
118 0         0 };
119 0         0 $SIG{PIPE} = 'IGNORE'; # catch them by hand
120              
121             # setup server sockets to listen for client connections
122 0         0 my @servers;
123 0         0 foreach my $listen (@{ MogileFS->config('listen') }) {
  0         0  
124 0 0       0 my $server = IO::Socket::INET->new(LocalAddr => $listen,
125             Type => SOCK_STREAM,
126             Proto => 'tcp',
127             Blocking => 0,
128             Reuse => 1,
129             Listen => 1024 )
130             or die "Error creating socket: $@\n";
131 0         0 $server->sockopt(SO_KEEPALIVE, 1);
132 0         0 $server->setsockopt(IPPROTO_TCP, TCP_NODELAY, 1);
133              
134             # save sub to accept a client
135 0         0 push @servers, $server;
136             Danga::Socket->AddOtherFds( fileno($server) => sub {
137 0     0   0 while (my $csock = $server->accept) {
138 0         0 MogileFS::Connection::Client->new($csock);
139             }
140 0         0 } );
141             }
142              
143             MogileFS::ProcManager->push_pre_fork_cleanup(sub {
144             # so children don't hold server connection open
145 0     0   0 close($_) foreach @servers;
146 0         0 });
147              
148             # setup the post event loop callback to spawn jobs, and the timeout
149 0         0 Danga::Socket->DebugLevel(3);
150 0         0 Danga::Socket->SetLoopTimeout( 250 ); # 250 milliseconds
151 0         0 Danga::Socket->SetPostLoopCallback(MogileFS::ProcManager->PostEventLoopChecker);
152              
153             # and now, actually start listening for events
154 0         0 eval {
155 0 0       0 print( "Starting event loop for frontend job on pid $$.\n" ) if $DEBUG;
156 0         0 Danga::Socket->EventLoop();
157             };
158              
159 0 0       0 if ($@) {
160 0         0 Mgd::log('err', "crash log: $@");
161 0         0 exit 1;
162             }
163 0         0 Mgd::log('info', 'ending run');
164 0         0 Sys::Syslog::closelog();
165 0         0 exit(0);
166             }
167              
168             # --------------------------------------------------------------------------
169              
170             package MogileFS;
171             # just so MogileFS->config($key) will work:
172 21     21   150 use MogileFS::Config qw(config);
  21         50  
  21         3667  
173              
174             my %hooks;
175              
176             sub register_worker_command {
177             # just pass this through to the Worker class
178 0     0   0 return MogileFS::Worker::Query::register_command(@_);
179             }
180              
181             sub register_global_hook {
182 0     0   0 $hooks{$_[0]} = $_[1];
183 0         0 return 1;
184             }
185              
186             sub unregister_global_hook {
187 0     0   0 delete $hooks{$_[0]};
188 0         0 return 1;
189             }
190              
191             sub run_global_hook {
192 0     0   0 my $hookname = shift;
193 0         0 my $ref = $hooks{$hookname};
194 0 0       0 return $ref->(@_) if defined $ref;
195 0         0 return undef;
196             }
197              
198             # --------------------------------------------------------------------------
199              
200             package Mgd; # conveniently short name
201 21     21   132 use strict;
  21         39  
  21         552  
202 21     21   106 use warnings;
  21         34  
  21         658  
203 21     21   103 use MogileFS::Config;
  21         31  
  21         1847  
204 21     21   116 use MogileFS::Util qw(error fatal debug); # for others calling Mgd::foo()
  21         39  
  21         920  
205 21     21   102 use Socket qw(SOL_SOCKET SO_RCVBUF AF_UNIX SOCK_STREAM PF_UNSPEC);
  21         34  
  21         2616  
206             BEGIN {
207             # detect the receive buffer size for Unix domain stream sockets,
208             # we assume the size is identical across all Unix domain sockets.
209 21 50   21   1303 socketpair(my $s1, my $s2, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
210             or die( "socketpair failed: $!" );
211              
212 21         280 my $r = getsockopt($s1, SOL_SOCKET, SO_RCVBUF);
213 21 50       95 defined $r or die "getsockopt: $!";
214 21 50       143 $r = unpack('i', $r) if defined $r;
215 21 50 33     141 $r = (defined $r && $r > 0) ? $r : 8192;
216 21         289 close $s1;
217 21         188 close $s2;
218 21     21   1343 eval 'use constant UNIX_RCVBUF_SIZE => $r';
  21         116  
  21         35  
  21         8340  
219             }
220              
221             sub server {
222 0     0   0 return MogileFS::Server->server;
223             }
224              
225             # database checking/connecting
226             sub validate_dbh {
227 0     0   0 my $sto = Mgd::get_store();
228 0         0 my $had_dbh = $sto->have_dbh;
229 0         0 $sto->recheck_dbh();
230 0         0 my $dbh;
231 0         0 eval { $dbh = $sto->dbh };
  0         0  
232             # Doesn't matter what the failure was; workers should retry later.
233 0 0 0     0 error("Error validating master DB: $@") if $@ && $had_dbh;
234 0         0 return $dbh;
235             }
236              
237             # the eventual replacement for callers asking for a dbh directly:
238             # they'll ask for the current store, which is a database abstraction
239             # layer.
240             my ($store, $store_pid);
241             sub get_store {
242 0 0 0 0   0 return $store if $store && $store_pid == $$;
243 0         0 $store_pid = $$;
244 0         0 return $store = MogileFS::Store->new;
245             }
246              
247             sub close_store {
248 0 0   0   0 if ($store) {
249 0         0 $store->dbh->disconnect();
250 0         0 $store = undef;
251 0         0 return 1;
252             }
253 0         0 return 0;
254             }
255              
256             # only for t/ scripts to explicitly set a store, without loading in a config
257             sub set_store {
258 0     0   0 my ($s) = @_;
259 0         0 $store = $s;
260 0         0 $store_pid = $$;
261             }
262              
263             sub domain_factory {
264 0     0   0 return MogileFS::Factory::Domain->get_factory;
265             }
266              
267             sub class_factory {
268 0     0   0 return MogileFS::Factory::Class->get_factory;
269             }
270              
271             sub host_factory {
272 0     0   0 return MogileFS::Factory::Host->get_factory;
273             }
274              
275             sub device_factory {
276 0     0   0 return MogileFS::Factory::Device->get_factory;
277             }
278              
279             # log stuff to syslog or the screen
280             sub log {
281             # simple logging functionality
282 4 50   4   26 if (! $MogileFS::Config::daemonize) {
283 4         33 $| = 1;
284             # syslog acts like printf so we have to use printf and append a \n
285 4         11 shift; # ignore the first parameter (info, warn, critical, etc)
286 4         7 my $mask = shift; # format string
287 4 50       45 $mask .= "\n" unless $mask =~ /\n$/;
288 4 50       15 my $message = @_ ? sprintf($mask, @_) : $mask;
289 4         310 print '[', scalar localtime(), '] ', $message;
290             } else {
291             # just pass the parameters to syslog
292 0           Sys::Syslog::syslog(@_);
293             }
294             }
295              
296             1;
297             __END__