File Coverage

Criterion Covered Total %
statement 13 15 86.6
branch n/a
condition n/a
subroutine 5 5 100.0
pod n/a
total 18 20 90.0

line stmt bran cond sub pod time code
1             package MogileFS::Store;
2 7     7   42 use strict;
  7         18  
  7         273  
3 7     7   44 use warnings;
  7         18  
  7         253  
4 7     7   40 use Carp qw(croak);
  7         15  
  7         395  
5 7     7   43 use MogileFS::Util qw(throw max);
  7         13  
  7         389  
6 7     7   14330 use DBI; # no reason a Store has to be DBI-based, but for now they all are.
7             use List::Util ();
9             # this is incremented whenever the schema changes. server will refuse
10             # to start-up with an old schema version
11             #
12             # 8: adds fsck_log table
13             # 9: adds 'drain' state to enum in device table
14             use constant SCHEMA_VERSION => 9;
16             sub new {
17             my ($class) = @_;
18             return $class->new_from_dsn_user_pass(map { MogileFS->config($_) } qw(db_dsn db_user db_pass));
19             }
21             sub new_from_dsn_user_pass {
22             my ($class, $dsn, $user, $pass) = @_;
23             my $subclass;
24             if ($dsn =~ /^DBI:mysql:/i) {
25             $subclass = "MogileFS::Store::MySQL";
26             } elsif ($dsn =~ /^DBI:SQLite:/i) {
27             $subclass = "MogileFS::Store::SQLite";
28             } elsif ($dsn =~ /^DBI:Oracle:/i) {
29             $subclass = "MogileFS::Store::Oracle";
30             } elsif ($dsn =~ /^DBI:Pg:/i) {
31             $subclass = "MogileFS::Store::Postgres";
32             } else {
33             die "Unknown database type: $dsn";
34             }
35             unless (eval "use $subclass; 1") {
36             die "Error loading $subclass: $@\n";
37             }
38             my $self = bless {
39             dsn => $dsn,
40             user => $user,
41             pass => $pass,
42             raise_errors => $subclass->want_raise_errors,
43             slave_list_cachetime => 0,
44             slave_list_cache => [],
45             recheck_req_gen => 0, # incremented generation, of recheck of dbh being requested
46             recheck_done_gen => 0, # once recheck is done, copy of what the request generation was
47             }, $subclass;
48             $self->init;
49             return $self;
50             }
52             sub want_raise_errors {
53             # will default to true later
54             0;
55             }
57             sub new_from_mogdbsetup {
58             my ($class, %args) = @_;
59             # where args is: dbhost dbname dbrootuser dbrootpass dbuser dbpass
60             my $dsn = $class->dsn_of_dbhost($args{dbname}, $args{dbhost});
62             my $try_make_sto = sub {
63             my $dbh = DBI->connect($dsn, $args{dbuser}, $args{dbpass}, {
64             PrintError => 0,
65             }) or return undef;
66             my $sto = $class->new_from_dsn_user_pass($dsn, $args{dbuser}, $args{dbpass});
67             $sto->raise_errors;
68             return $sto;
69             };
71             # upgrading, apparently, as this database already exists.
72             my $sto = $try_make_sto->();
73             return $sto if $sto;
75             # otherwise, we need to make the requested database, setup permissions, etc
76             $class->status("couldn't connect to database as mogilefs user. trying root...");
77             my $rootdsn = $class->dsn_of_root($args{dbname}, $args{dbhost});
78             my $rdbh = DBI->connect($rootdsn, $args{dbrootuser}, $args{dbrootpass}, {
79             PrintError => 0,
80             }) or
81             die "Failed to connect to $dsn as specified root user: " . DBI->errstr . "\n";
82             $class->status("connected to database as root user.");
84             $class->confirm("Create database name '$args{dbname}'?");
85             $class->create_db_if_not_exists($rdbh, $args{dbname});
86             $class->confirm("Grant all privileges to user '$args{dbuser}', connecting from anywhere, to the mogilefs database '$args{dbname}'?");
87             $class->grant_privileges($rdbh, $args{dbname}, $args{dbuser}, $args{dbpass});
89             # should be ready now:
90             $sto = $try_make_sto->();
91             return $sto if $sto;
93             die "Failed to connect to database as regular user, even after creating it and setting up permissions as the root user.";
94             }
96             # given a root DBI connection, create the named database. succeed
97             # if it it's made, or already exists. die otherwise.
98             sub create_db_if_not_exists {
99             my ($pkg, $rdbh, $dbname) = @_;
100             $rdbh->do("CREATE DATABASE IF NOT EXISTS $dbname")
101             or die "Failed to create database '$dbname': " . $rdbh->errstr . "\n";
102             }
104             sub grant_privileges {
105             my ($pkg, $rdbh, $dbname, $user, $pass) = @_;
106             $rdbh->do("GRANT ALL PRIVILEGES ON $dbname.* TO $user\@'\%' IDENTIFIED BY ?",
107             undef, $pass)
108             or die "Failed to grant privileges: " . $rdbh->errstr . "\n";
109             $rdbh->do("GRANT ALL PRIVILEGES ON $dbname.* TO $user\@'localhost' IDENTIFIED BY ?",
110             undef, $pass)
111             or die "Failed to grant privileges: " . $rdbh->errstr . "\n";
112             }
114             sub can_replace { 0 }
115             sub can_insertignore { 0 }
116             sub can_insert_multi { 0 }
118             sub unix_timestamp { die "No function in $_[0] to return DB's unixtime." }
120             sub ignore_replace {
121             my $self = shift;
122             return "INSERT IGNORE " if $self->can_insertignore;
123             return "REPLACE " if $self->can_replace;
124             die "Can't INSERT IGNORE or REPLACE?";
125             }
127             my $on_status = sub {};
128             my $on_confirm = sub { 1 };
129             sub on_status { my ($pkg, $code) = @_; $on_status = $code; };
130             sub on_confirm { my ($pkg, $code) = @_; $on_confirm = $code; };
131             sub status { my ($pkg, $msg) = @_; $on_status->($msg); };
132             sub confirm { my ($pkg, $msg) = @_; $on_confirm->($msg) or die "Aborted.\n"; };
134             sub latest_schema_version { SCHEMA_VERSION }
136             sub raise_errors {
137             my $self = shift;
138             $self->{raise_errors} = 1;
139             $self->dbh->{RaiseError} = 1;
140             }
142             sub dsn { $_[0]{dsn} }
143             sub user { $_[0]{user} }
144             sub pass { $_[0]{pass} }
146             sub init { 1 }
147             sub post_dbi_connect { 1 }
149             sub can_do_slaves { 0 }
151             sub mark_as_slave {
152             my $self = shift;
153             die "Incapable of becoming slave." unless $self->can_do_slaves;
155             $self->{slave} = 1;
156             }
158             sub is_slave {
159             my $self = shift;
160             return $self->{slave};
161             }
163             # Returns a list of arrayrefs, each being [$dsn, $username, $password] for connecting to a slave DB.
164             sub _slaves_list {
165             my $self = shift;
166             my $now = time();
168             # only reload every 15 seconds.
169             if ($self->{slave_list_cachetime} > $now - 15) {
170             return @{$self->{slave_list_cache}};
171             }
172             $self->{slave_list_cachetime} = $now;
173             $self->{slave_list_cache} = [];
175             my $sk = MogileFS::Config->server_setting('slave_keys')
176             or return ();
178             my @ret;
179             foreach my $key (split /\s*,\s*/, $sk) {
180             my $slave = MogileFS::Config->server_setting("slave_$key");
181             my ($dsn, $user, $pass) = split /\|/, $slave;
182             push @ret, [$dsn, $user, $pass];
183             }
185             $self->{slave_list_cache} = \@ret;
186             return @ret;
187             }
189             sub get_slave {
190             my $self = shift;
192             die "Incapable of having slaves." unless $self->can_do_slaves;
194             return $self->{slave} if $self->check_slave;
196             my @slaves_list = $self->_slaves_list;
198             # If we have no slaves, then return silently.
199             return unless @slaves_list;
201             foreach my $slave_fulldsn (@slaves_list) {
202             my $newslave = $self->{slave} = $self->new_from_dsn_user_pass(@$slave_fulldsn);
203             $self->{slave_next_check} = 0;
204             $newslave->mark_as_slave;
205             return $newslave
206             if $self->check_slave;
207             }
209             warn "Slave list exhausted, failing back to master.";
210             return;
211             }
213             sub read_store {
214             my $self = shift;
216             return $self unless $self->can_do_slaves;
218             if ($self->{slave_ok}) {
219             if (my $slave = $self->get_slave) {
220             $slave->{recheck_req_gen} = $self->{recheck_req_gen};
221             return $slave;
222             }
223             }
225             return $self;
226             }
228             sub slaves_ok {
229             my $self = shift;
230             my $coderef = shift;
232             return unless ref $coderef eq 'CODE';
234             local $self->{slave_ok} = 1;
236             return $coderef->(@_);
237             }
239             sub recheck_dbh {
240             my $self = shift;
241             $self->{recheck_req_gen}++;
242             }
244             sub dbh {
245             my $self = shift;
246             if ($self->{dbh}) {
247             if ($self->{recheck_done_gen} != $self->{recheck_req_gen}) {
248             $self->{dbh} = undef unless $self->{dbh}->ping;
249             $self->{recheck_done_gen} = $self->{recheck_req_gen};
250             }
251             return $self->{dbh} if $self->{dbh};
252             }
254             $self->{dbh} = DBI->connect($self->{dsn}, $self->{user}, $self->{pass}, {
255             PrintError => 0,
256             AutoCommit => 1,
257             # FUTURE: will default to on (have to validate all callers first):
258             RaiseError => ($self->{raise_errors} || 0),
259             }) or
260             die "Failed to connect to database: " . DBI->errstr;
261             $self->post_dbi_connect;
262             return $self->{dbh};
263             }
265             sub ping {
266             my $self = shift;
267             return $self->dbh->ping;
268             }
270             sub condthrow {
271             my ($self, $optmsg) = @_;
272             my $dbh = $self->dbh;
273             return unless $dbh->err;
274             my ($pkg, $fn, $line) = caller;
275             my $msg = "Database error from $pkg/$fn/$line: " . $dbh->errstr;
276             $msg .= ": $optmsg" if $optmsg;
277             croak($msg);
278             }
280             sub dowell {
281             my ($self, $sql, @do_params) = @_;
282             my $rv = eval { $self->dbh->do($sql, @do_params) };
283             return $rv unless $@ || $self->dbh->err;
284             warn "Error with SQL: $sql\n";
285             Carp::confess($@ || $self->dbh->errstr);
286             }
288             sub _valid_params {
289             croak("Odd number of parameters!") if scalar(@_) % 2;
290             my ($self, $vlist, %uarg) = @_;
291             my %ret;
292             $ret{$_} = delete $uarg{$_} foreach @$vlist;
293             croak("Bogus options: ".join(',',keys %uarg)) if %uarg;
294             return %ret;
295             }
297             sub was_duplicate_error {
298             my $self = shift;
299             my $dbh = $self->dbh;
300             die "UNIMPLEMENTED";
301             }
303             # run a subref (presumably a database update) in an eval, because you expect it to
304             # maybe fail on duplicate key error, and throw a dup exception for you, else return
305             # its return value
306             sub conddup {
307             my ($self, $code) = @_;
308             my $rv = eval { $code->(); };
309             throw("dup") if $self->was_duplicate_error;
310             return $rv;
311             }
313             # insert row if doesn't already exist
314             # WARNING: This function is NOT transaction safe if the duplicate errors causes
315             # your transaction to halt!
316             # WARNING: This function is NOT safe on multi-row inserts if can_insertignore
317             # is false! Rows before the duplicate will be inserted, but rows after the
318             # duplicate might not be, depending your database.
319             sub insert_ignore {
320             my ($self, $sql, @params) = @_;
321             my $dbh = $self->dbh;
322             if ($self->can_insertignore) {
323             return $dbh->do("INSERT IGNORE $sql", @params);
324             } else {
325             # TODO: Detect bad multi-row insert here.
326             my $rv = eval { $dbh->do("INSERT $sql", @params); };
327             if ($@ || $dbh->err) {
328             return 1 if $self->was_duplicate_error;
329             # This chunk is identical to condthrow, but we include it directly
330             # here as we know there is definetly an error, and we would like
331             # the caller of this function.
332             my ($pkg, $fn, $line) = caller;
333             my $msg = "Database error from $pkg/$fn/$line: " . $dbh->errstr;
334             croak($msg);
335             }
336             return $rv;
337             }
338             }
340             # --------------------------------------------------------------------------
342             my @extra_tables;
344             sub add_extra_tables {
345             my $class = shift;
346             push @extra_tables, @_;
347             }
349             use constant TABLES => qw( domain class file tempfile file_to_delete
350             unreachable_fids file_on file_on_corrupt host
351             device server_settings file_to_replicate
352             file_to_delete_later fsck_log);
354             sub setup_database {
355             my $sto = shift;
357             # schema history:
358             # 8: adds fsck_log table
359             # 7: adds file_to_delete_later table
360             # 6: adds file_to_replicate table
361             my $curver = $sto->schema_version;
363             my $latestver = SCHEMA_VERSION;
364             if ($curver == $latestver) {
365             $sto->status("Schema already up-to-date at version $curver.");
366             return 1;
367             }
369             if ($curver > $latestver) {
370             die "Your current schema version is $curver, but this version of mogdbsetup only knows up to $latestver. Aborting to be safe.\n";
371             }
373             if ($curver) {
374             $sto->confirm("Install/upgrade your schema from version $curver to version $latestver?");
375             }
377             foreach my $t (TABLES, @extra_tables) {
378             $sto->create_table($t);
379             }
381             $sto->upgrade_add_host_getport;
382             $sto->upgrade_add_host_altip;
383             $sto->upgrade_add_device_asof;
384             $sto->upgrade_add_device_weight;
385             $sto->upgrade_add_device_readonly;
386             $sto->upgrade_add_device_drain;
388             return 1;
389             }
391             sub schema_version {
392             my $self = shift;
393             my $dbh = $self->dbh;
394             return eval {
395             $dbh->selectrow_array("SELECT value FROM server_settings WHERE field='schema_version'") || 0;
396             } || 0;
397             }
399             sub filter_create_sql { my ($self, $sql) = @_; return $sql; }
401             sub create_table {
402             my ($self, $table) = @_;
403             my $dbh = $self->dbh;
404             return 1 if $self->table_exists($table);
405             my $meth = "TABLE_$table";
406             my $sql = $self->$meth;
407             $sql = $self->filter_create_sql($sql);
408             $self->status("Running SQL: $sql;");
409             $dbh->do($sql) or
410             die "Failed to create table $table: " . $dbh->errstr;
411             my $imeth = "INDEXES_$table";
412             my @indexes = eval { $self->$imeth };
413             foreach $sql (@indexes) {
414             $self->status("Running SQL: $sql;");
415             $dbh->do($sql) or
416             die "Failed to create indexes on $table: " . $dbh->errstr;
417             }
418             }
420             # Please try to keep all tables aligned nicely
421             # with '"CREATE TABLE' on the first line
422             # and ')"' alone on the last line.
424             sub TABLE_domain {
425             # classes are tied to domains. domains can have classes of items
426             # with different mindevcounts.
427             #
428             # a minimum devcount is the number of copies the system tries to
429             # maintain for files in that class
430             #
431             # unspecified classname means classid=0 (implicit class), and that
432             # implies mindevcount=2
433             "CREATE TABLE domain (
435             namespace VARCHAR(255),
436             UNIQUE (namespace)
437             )"
438             }
440             sub TABLE_class {
441             "CREATE TABLE class (
442             dmid SMALLINT UNSIGNED NOT NULL,
443             classid TINYINT UNSIGNED NOT NULL,
444             PRIMARY KEY (dmid,classid),
445             classname VARCHAR(50),
446             UNIQUE (dmid,classname),
447             mindevcount TINYINT UNSIGNED NOT NULL
448             )"
449             }
451             # the length field is only here for easy verifications of content
452             # integrity when copying around. no sums or content types or other
453             # metadata here. application can handle that.
454             #
455             # classid is what class of file this belongs to. for instance, on fotobilder
456             # there will be a class for original pictures (the ones the user uploaded)
457             # and a class for derived images (scaled down versions, thumbnails, greyscale, etc)
458             # each domain can setup classes and assign the minimum redundancy level for
459             # each class. fotobilder will use a 2 or 3 minimum copy redundancy for original
460             # photos and and a 1 minimum for derived images (which means the sole device
461             # for a derived image can die, bringing devcount to 0 for that file, but
462             # the application can recreate it from its original)
463             sub TABLE_file {
464             "CREATE TABLE file (
465             fid INT UNSIGNED NOT NULL,
466             PRIMARY KEY (fid),
468             dmid SMALLINT UNSIGNED NOT NULL,
469             dkey VARCHAR(255), # domain-defined
470             UNIQUE dkey (dmid, dkey),
472             length INT UNSIGNED, # 4GB limit
474             classid TINYINT UNSIGNED NOT NULL,
475             devcount TINYINT UNSIGNED NOT NULL,
476             INDEX devcount (dmid,classid,devcount)
477             )"
478             }
480             sub TABLE_tempfile {
481             "CREATE TABLE tempfile (
483             PRIMARY KEY (fid),
485             createtime INT UNSIGNED NOT NULL,
486             classid TINYINT UNSIGNED NOT NULL,
487             dmid SMALLINT UNSIGNED NOT NULL,
488             dkey VARCHAR(255),
489             devids VARCHAR(60)
490             )"
491             }
493             # files marked for death when their key is overwritten. then they get a new
494             # fid, but since the old row (with the old fid) had to be deleted immediately,
495             # we need a place to store the fid so an async job can delete the file from
496             # all devices.
497             sub TABLE_file_to_delete {
498             "CREATE TABLE file_to_delete (
499             fid INT UNSIGNED NOT NULL,
500             PRIMARY KEY (fid)
501             )"
502             }
504             # if the replicator notices that a fid has no sources, that file gets inserted
505             # into the unreachable_fids table. it is up to the application to actually
506             # handle fids stored in this table.
507             sub TABLE_unreachable_fids {
508             "CREATE TABLE unreachable_fids (
509             fid INT UNSIGNED NOT NULL,
510             lastupdate INT UNSIGNED NOT NULL,
511             PRIMARY KEY (fid),
512             INDEX (lastupdate)
513             )"
514             }
516             # what files are on what devices? (most likely physical devices,
517             # as logical devices of RAID arrays would be costly, and mogilefs
518             # already handles redundancy)
519             #
520             # the devid index lets us answer "What files were on this now-dead disk?"
521             sub TABLE_file_on {
522             "CREATE TABLE file_on (
523             fid INT UNSIGNED NOT NULL,
524             devid MEDIUMINT UNSIGNED NOT NULL,
525             PRIMARY KEY (fid, devid),
526             INDEX (devid)
527             )"
528             }
530             # if application or framework detects an error in one of the duplicate files
531             # for whatever reason, it can register its complaint and the framework
532             # will do some verifications and fix things up w/ an async job
533             # MAYBE: let application tell us the SHA1/MD5 of the file for us to check
534             # on the other devices?
535             sub TABLE_file_on_corrupt {
536             "CREATE TABLE file_on_corrupt (
537             fid INT UNSIGNED NOT NULL,
538             devid MEDIUMINT UNSIGNED NOT NULL,
539             PRIMARY KEY (fid, devid)
540             )"
541             }
543             # hosts (which contain devices...)
544             sub TABLE_host {
545             "CREATE TABLE host (
548             status ENUM('alive','dead','down'),
549             http_port MEDIUMINT UNSIGNED DEFAULT 7500,
550             http_get_port MEDIUMINT UNSIGNED,
552             hostname VARCHAR(40),
553             hostip VARCHAR(15),
554             altip VARCHAR(15),
555             altmask VARCHAR(18),
556             UNIQUE (hostname),
557             UNIQUE (hostip),
558             UNIQUE (altip)
559             )"
560             }
562             # disks...
563             sub TABLE_device {
564             "CREATE TABLE device (
565             devid MEDIUMINT UNSIGNED NOT NULL,
566             hostid MEDIUMINT UNSIGNED NOT NULL,
568             status ENUM('alive','dead','down'),
569             weight MEDIUMINT DEFAULT 100,
571             mb_total MEDIUMINT UNSIGNED,
572             mb_used MEDIUMINT UNSIGNED,
573             mb_asof INT UNSIGNED,
574             PRIMARY KEY (devid),
575             INDEX (status)
576             )"
577             }
579             sub TABLE_server_settings {
580             "CREATE TABLE server_settings (
581             field VARCHAR(50) PRIMARY KEY,
582             value VARCHAR(255)
583             )"
584             }
586             sub TABLE_file_to_replicate {
587             # nexttry is time to try to replicate it next.
588             # 0 means immediate. it's only on one host.
589             # 1 means lower priority. it's on 2+ but isn't happy where it's at.
590             # unixtimestamp means at/after that time. some previous error occurred.
591             # fromdevid, if not null, means which devid we should replicate from. perhaps it's the only non-corrupt one. otherwise, wherever.
592             # failcount. how many times we've failed, just for doing backoff of nexttry.
593             # flags. reserved for future use.
594             "CREATE TABLE file_to_replicate (
596             nexttry INT UNSIGNED NOT NULL,
597             INDEX (nexttry),
598             fromdevid INT UNSIGNED,
599             failcount TINYINT UNSIGNED NOT NULL DEFAULT 0,
601             )"
602             }
604             sub TABLE_file_to_delete_later {
605             "CREATE TABLE file_to_delete_later (
607             delafter INT UNSIGNED NOT NULL,
608             INDEX (delafter)
609             )"
610             }
612             sub TABLE_fsck_log {
613             "CREATE TABLE fsck_log (
615             PRIMARY KEY (logid),
616             utime INT UNSIGNED NOT NULL,
617             fid INT UNSIGNED NULL,
618             evcode CHAR(4),
619             devid MEDIUMINT UNSIGNED,
620             INDEX(utime)
621             )"
622             }
624             # these five only necessary for MySQL, since no other database existed
625             # before, so they can just create the tables correctly to begin with.
626             # in the future, there might be new alters that non-MySQL databases
627             # will have to implement.
628             sub upgrade_add_host_getport { 1 }
629             sub upgrade_add_host_altip { 1 }
630             sub upgrade_add_device_asof { 1 }
631             sub upgrade_add_device_weight { 1 }
632             sub upgrade_add_device_readonly { 1 }
633             sub upgrade_add_device_drain { die "Not implemented in $_[0]" }
635             # return true if deleted, 0 if didn't exist, exception if error
636             sub delete_host {
637             my ($self, $hostid) = @_;
638             return $self->dbh->do("DELETE FROM host WHERE hostid = ?", undef, $hostid);
639             }
641             # return true if deleted, 0 if didn't exist, exception if error
642             sub delete_domain {
643             my ($self, $dmid) = @_;
644             return $self->dbh->do("DELETE FROM domain WHERE dmid = ?", undef, $dmid);
645             }
647             sub domain_has_files {
648             my ($self, $dmid) = @_;
649             my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? LIMIT 1',
650             undef, $dmid);
651             return $has_a_fid ? 1 : 0;
652             }
654             sub class_has_files {
655             my ($self, $dmid, $clid) = @_;
656             my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? AND classid = ? LIMIT 1',
657             undef, $dmid, $clid);
658             return $has_a_fid ? 1 : 0;
659             }
661             # return new classid on success (non-zero integer), die on failure
662             # throw 'dup' on duplicate name
663             # override this if you want a less racy version.
664             sub create_class {
665             my ($self, $dmid, $classname) = @_;
666             my $dbh = $self->dbh;
668             # get the max class id in this domain
669             my $maxid = $dbh->selectrow_array
670             ('SELECT MAX(classid) FROM class WHERE dmid = ?', undef, $dmid) || 0;
672             # now insert the new class
673             my $rv = eval {
674             $dbh->do("INSERT INTO class (dmid, classid, classname, mindevcount) VALUES (?, ?, ?, ?)",
675             undef, $dmid, $maxid + 1, $classname, 2);
676             };
677             if ($@ || $dbh->err) {
678             if ($self->was_duplicate_error) {
679             throw("dup");
680             }
681             }
682             return $maxid + 1 if $rv;
683             $self->condthrow;
684             die;
685             }
687             # return 1 on success, throw "dup" on duplicate name error, die otherwise
688             sub update_class_name {
689             my $self = shift;
690             my %arg = $self->_valid_params([qw(dmid classid classname)], @_);
691             my $rv = eval {
692             $self->dbh->do("UPDATE class SET classname=? WHERE dmid=? AND classid=?",
693             undef, $arg{classname}, $arg{dmid}, $arg{classid});
694             };
695             throw("dup") if $self->was_duplicate_error;
696             $self->condthrow;
697             return 1;
698             }
700             # return 1 on success, die otherwise
701             sub update_class_mindevcount {
702             my $self = shift;
703             my %arg = $self->_valid_params([qw(dmid classid mindevcount)], @_);
704             $self->dbh->do("UPDATE class SET mindevcount=? WHERE dmid=? AND classid=?",
705             undef, $arg{mindevcount}, $arg{dmid}, $arg{classid});
706             $self->condthrow;
707             return 1;
708             }
710             sub nfiles_with_dmid_classid_devcount {
711             my ($self, $dmid, $classid, $devcount) = @_;
712             return $self->dbh->selectrow_array('SELECT COUNT(*) FROM file WHERE dmid = ? AND classid = ? AND devcount = ?',
713             undef, $dmid, $classid, $devcount);
714             }
716             sub set_server_setting {
717             my ($self, $key, $val) = @_;
718             my $dbh = $self->dbh;
719             die "Your database does not support REPLACE! Reimplement set_server_setting!" unless $self->can_replace;
721             if (defined $val) {
722             $dbh->do("REPLACE INTO server_settings (field, value) VALUES (?, ?)", undef, $key, $val);
723             } else {
724             $dbh->do("DELETE FROM server_settings WHERE field=?", undef, $key);
725             }
727             die "Error updating 'server_settings': " . $dbh->errstr if $dbh->err;
728             return 1;
729             }
731             # FIXME: racy. currently the only caller doesn't matter, but should be fixed.
732             sub incr_server_setting {
733             my ($self, $key, $val) = @_;
734             $val = 1 unless defined $val;
735             return unless $val;
737             return 1 if $self->dbh->do("UPDATE server_settings ".
738             "SET value=value+? ".
739             "WHERE field=?", undef,
740             $val, $key) > 0;
741             $self->set_server_setting($key, $val);
742             }
744             sub server_setting {
745             my ($self, $key) = @_;
746             return $self->dbh->selectrow_array("SELECT value FROM server_settings WHERE field=?",
747             undef, $key);
748             }
750             sub server_settings {
751             my ($self) = @_;
752             my $ret = {};
753             my $sth = $self->dbh->prepare("SELECT field, value FROM server_settings");
754             $sth->execute;
755             while (my ($k, $v) = $sth->fetchrow_array) {
756             $ret->{$k} = $v;
757             }
758             return $ret;
759             }
761             # register a tempfile and return the fidid, which should be allocated
762             # using autoincrement/sequences if the passed in fid is undef. however,
763             # if fid is passed in, that value should be used and returned.
764             #
765             # return new/passed in fidid on success.
766             # throw 'dup' if fid already in use
767             # return 0/undef/die on failure
768             #
769             sub register_tempfile {
770             my $self = shift;
771             my %arg = $self->_valid_params([qw(fid dmid key classid devids)], @_);
773             my $dbh = $self->dbh;
774             my $fid = $arg{fid};
776             my $explicit_fid_used = $fid ? 1 : 0;
778             # setup the new mapping. we store the devices that we picked for
779             # this file in here, knowing that they might not be used. create_close
780             # is responsible for actually mapping in file_on. NOTE: fid is being
781             # passed in, it's either some number they gave us, or it's going to be
782             # 0/undef which translates into NULL which means to automatically create
783             # one. that should be fine.
784             my $ins_tempfile = sub {
785             my $rv = eval {
786             # We must only pass the correct number of bind parameters
787             # Using 'NULL' for the AUTO_INCREMENT/SERIAL column will fail on
788             # Postgres, where you are expected to leave it out or use DEFAULT
789             # Leaving it out seems sanest and least likely to cause problems
790             # with other databases.
791             my @keys = ('dmid', 'dkey', 'classid', 'devids', 'createtime');
792             my @vars = ('?' , '?' , '?' , '?' , $self->unix_timestamp);
793             my @vals = ($arg{dmid}, $arg{key}, $arg{classid} || 0, $arg{devids});
794             # Do not check for $explicit_fid_used, but rather $fid directly
795             # as this anonymous sub is called from the loop later
796             if($fid) {
797             unshift @keys, 'fid';
798             unshift @vars, '?';
799             unshift @vals, $fid;
800             }
801             my $sql = "INSERT INTO tempfile (".join(',',@keys).") VALUES (".join(',',@vars).")";
802             $dbh->do($sql, undef, @vals);
803             };
804             if (!$rv) {
805             return undef if $self->was_duplicate_error;
806             die "Unexpected db error into tempfile: " . $dbh->errstr;
807             }
809             unless (defined $fid) {
810             # if they did not give us a fid, then we want to grab the one that was
811             # theoretically automatically generated
812             $fid = $dbh->last_insert_id(undef, undef, 'tempfile', 'fid')
813             or die "No last_insert_id found";
814             }
815             return undef unless defined $fid && $fid > 0;
816             return 1;
817             };
819             unless ($ins_tempfile->()) {
820             throw("dup") if $explicit_fid_used;
821             die "tempfile insert failed";
822             }
824             my $fid_in_use = sub {
825             my $exists = $dbh->selectrow_array("SELECT COUNT(*) FROM file WHERE fid=?", undef, $fid);
826             return $exists ? 1 : 0;
827             };
829             # if the fid is in use, do something
830             while ($fid_in_use->($fid)) {
831             throw("dup") if $explicit_fid_used;
833             # be careful of databases which reset their
834             # auto-increment/sequences when the table is empty (InnoDB
835             # did/does this, for instance). So check if it's in use, and
836             # re-seed the table with the highest known fid from the file
837             # table.
839             # get the highest fid from the filetable and insert a dummy row
840             $fid = $dbh->selectrow_array("SELECT MAX(fid) FROM file");
841             $ins_tempfile->(); # don't care about its result
843             # then do a normal auto-increment
844             $fid = undef;
845             $ins_tempfile->() or die "register_tempfile failed after seeding";
846             }
848             return $fid;
849             }
851             # return hashref of row containing columns "fid, dmid, dkey, length,
852             # classid, devcount" provided a $dmid and $key (dkey). or undef if no
853             # row.
854             sub file_row_from_dmid_key {
855             my ($self, $dmid, $key) = @_;
856             return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ".
857             "FROM file WHERE dmid=? AND dkey=?",
858             undef, $dmid, $key);
859             }
861             # return hashref of row containing columns "fid, dmid, dkey, length,
862             # classid, devcount" provided a $fidid or undef if no row.
863             sub file_row_from_fidid {
864             my ($self, $fidid) = @_;
865             return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ".
866             "FROM file WHERE fid=?",
867             undef, $fidid);
868             }
870             # return an arrayref of rows containing columns "fid, dmid, dkey, length,
871             # classid, devcount" provided a pair of $fidid or undef if no rows.
872             sub file_row_from_fidid_range {
873             my ($self, $fromfid, $tofid) = @_;
874             my $sth = $self->dbh->prepare("SELECT fid, dmid, dkey, length, classid, devcount ".
875             "FROM file WHERE fid BETWEEN ? AND ?");
876             $sth->execute($fromfid,$tofid);
877             return $sth->fetchall_arrayref({});
878             }
880             # return array of devids that a fidid is on
881             sub fid_devids {
882             my ($self, $fidid) = @_;
883             return @{ $self->dbh->selectcol_arrayref("SELECT devid FROM file_on WHERE fid=?",
884             undef, $fidid) || [] };
885             }
887             # return hashref of { $fidid => [ $devid, $devid... ] } for a bunch of given @fidids
888             sub fid_devids_multiple {
889             my ($self, @fidids) = @_;
890             my $in = join(",", map { $_+0 } @fidids);
891             my $ret = {};
892             my $sth = $self->dbh->prepare("SELECT fid, devid FROM file_on WHERE fid IN ($in)");
893             $sth->execute;
894             while (my ($fidid, $devid) = $sth->fetchrow_array) {
895             push @{$ret->{$fidid} ||= []}, $devid;
896             }
897             return $ret;
898             }
900             # return hashref of columns classid, dmid, dkey, given a $fidid, or return undef
901             sub tempfile_row_from_fid {
902             my ($self, $fidid) = @_;
903             return $self->dbh->selectrow_hashref("SELECT classid, dmid, dkey ".
904             "FROM tempfile WHERE fid=?",
905             undef, $fidid);
906             }
908             # return 1 on success, throw "dup" on duplicate devid or throws other error on failure
909             sub create_device {
910             my ($self, $devid, $hostid, $status) = @_;
911             my $rv = $self->conddup(sub {
912             $self->dbh->do("INSERT INTO device (devid, hostid, status) VALUES (?,?,?)", undef,
913             $devid, $hostid, $status);
914             });
915             $self->condthrow;
916             die "error making device $devid\n" unless $rv > 0;
917             return 1;
918             }
920             sub update_device_usage {
921             my $self = shift;
922             my %arg = $self->_valid_params([qw(mb_total mb_used devid)], @_);
923             $self->dbh->do("UPDATE device SET mb_total = ?, mb_used = ?, mb_asof = " . $self->unix_timestamp .
924             " WHERE devid = ?", undef, $arg{mb_total}, $arg{mb_used}, $arg{devid});
925             $self->condthrow;
926             }
928             sub mark_fidid_unreachable {
929             my ($self, $fidid) = @_;
930             die "Your database does not support REPLACE! Reimplement mark_fidid_unreachable!" unless $self->can_replace;
931             $self->dbh->do("REPLACE INTO unreachable_fids VALUES (?, " . $self->unix_timestamp . ")",
932             undef, $fidid);
933             }
935             sub set_device_weight {
936             my ($self, $devid, $weight) = @_;
937             $self->dbh->do('UPDATE device SET weight = ? WHERE devid = ?', undef, $weight, $devid);
938             $self->condthrow;
939             }
941             sub set_device_state {
942             my ($self, $devid, $state) = @_;
943             $self->dbh->do('UPDATE device SET status = ? WHERE devid = ?', undef, $state, $devid);
944             $self->condthrow;
945             }
947             sub delete_class {
948             my ($self, $dmid, $cid) = @_;
949             $self->dbh->do("DELETE FROM class WHERE dmid = ? AND classid = ?", undef, $dmid, $cid);
950             $self->condthrow;
951             }
953             sub delete_fidid {
954             my ($self, $fidid) = @_;
955             $self->dbh->do("DELETE FROM file WHERE fid=?", undef, $fidid);
956             $self->condthrow;
957             $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid);
958             $self->condthrow;
959             $self->dbh->do($self->ignore_replace . " INTO file_to_delete (fid) VALUES (?)", undef, $fidid);
960             $self->condthrow;
961             }
963             sub delete_tempfile_row {
964             my ($self, $fidid) = @_;
965             $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid);
966             $self->condthrow;
967             }
969             sub replace_into_file {
970             my $self = shift;
971             my %arg = $self->_valid_params([qw(fidid dmid key length classid)], @_);
972             die "Your database does not support REPLACE! Reimplement replace_into_file!" unless $self->can_replace;
973             $self->dbh->do("REPLACE INTO file (fid, dmid, dkey, length, classid, devcount) ".
974             "VALUES (?,?,?,?,?,0) ", undef,
975             @arg{'fidid', 'dmid', 'key', 'length', 'classid'});
976             $self->condthrow;
977             }
979             # returns 1 on success, 0 on duplicate key error, dies on exception
980             # TODO: need a test to hit the duplicate name error condition
981             # TODO: switch to using "dup" exception here?
982             sub rename_file {
983             my ($self, $fidid, $to_key) = @_;
984             my $dbh = $self->dbh;
985             eval {
986             $dbh->do('UPDATE file SET dkey = ? WHERE fid=?',
987             undef, $to_key, $fidid);
988             };
989             if ($@ || $dbh->err) {
990             # first is mysql's error code for duplicates
991             if ($self->was_duplicate_error) {
992             return 0;
993             } else {
994             die $@;
995             }
996             }
997             $self->condthrow;
998             return 1;
999             }
1001             # returns a hash of domains. Key is namespace, value is dmid.
1002             sub get_all_domains {
1003             my ($self) = @_;
1004             my $domains = $self->dbh->selectall_arrayref('SELECT namespace, dmid FROM domain');
1005             return map { ($_->[0], $_->[1]) } @{$domains || []};
1006             }
1008             # returns an array of hashrefs, one hashref per row in the 'class' table
1009             sub get_all_classes {
1010             my ($self) = @_;
1011             my (@ret, $row);
1012             my $sth = $self->dbh->prepare("SELECT dmid, classid, classname, mindevcount FROM class");
1013             $sth->execute;
1014             push @ret, $row while $row = $sth->fetchrow_hashref;
1015             return @ret;
1016             }
1018             # add a record of fidid existing on devid
1019             # returns 1 on success, 0 on duplicate
1020             sub add_fidid_to_devid {
1021             my ($self, $fidid, $devid) = @_;
1022             croak("fidid not non-zero") unless $fidid;
1023             croak("devid not non-zero") unless $devid;
1025             # TODO: This should possibly be insert_ignore instead
1026             # As if we are adding an extra file_on entry, we do not want to replace the
1027             # exist one. Check REPLACE semantics.
1028             my $rv = $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES (?,?)",
1029             undef, $fidid, $devid);
1030             return 1 if $rv > 0;
1031             return 0;
1032             }
1034             # remove a record of fidid existing on devid
1035             # returns 1 on success, 0 if not there anyway
1036             sub remove_fidid_from_devid {
1037             my ($self, $fidid, $devid) = @_;
1038             my $rv = $self->dbh->do("DELETE FROM file_on WHERE fid=? AND devid=?",
1039             undef, $fidid, $devid);
1040             $self->condthrow;
1041             return $rv;
1042             }
1044             # get all hosts from database, returns them as list of hashrefs, hashrefs being the row contents.
1045             sub get_all_hosts {
1046             my ($self) = @_;
1047             my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ hostid, status, hostname, " .
1048             "hostip, http_port, http_get_port, altip, altmask FROM host");
1049             $sth->execute;
1050             my @ret;
1051             while (my $row = $sth->fetchrow_hashref) {
1052             push @ret, $row;
1053             }
1054             return @ret;
1055             }
1057             # get all devices from database, returns them as list of hashrefs, hashrefs being the row contents.
1058             sub get_all_devices {
1059             my ($self) = @_;
1060             my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ devid, hostid, mb_total, " .
1061             "mb_used, mb_asof, status, weight FROM device");
1062             $self->condthrow;
1063             $sth->execute;
1064             my @return;
1065             while (my $row = $sth->fetchrow_hashref) {
1066             push @return, $row;
1067             }
1068             return @return;
1069             }
1071             # update the device count for a given fidid
1072             sub update_devcount {
1073             my ($self, $fidid) = @_;
1074             my $dbh = $self->dbh;
1075             my $ct = $dbh->selectrow_array("SELECT COUNT(*) FROM file_on WHERE fid=?",
1076             undef, $fidid);
1078             $dbh->do("UPDATE file SET devcount=? WHERE fid=?", undef,
1079             $ct, $fidid);
1081             return 1;
1082             }
1084             # enqueue a fidid for replication, from a specific deviceid (can be undef), in a given number of seconds.
1085             sub enqueue_for_replication {
1086             my ($self, $fidid, $from_devid, $in) = @_;
1088             my $nexttry = 0;
1089             if ($in) {
1090             $nexttry = $self->unix_timestamp . " + " . int($in);
1091             }
1093             $self->insert_ignore("INTO file_to_replicate (fid, fromdevid, nexttry) ".
1094             "VALUES (?,?,$nexttry)", undef, $fidid, $from_devid);
1095             }
1097             # reschedule all deferred replication, return number rescheduled
1098             sub replicate_now {
1099             my ($self) = @_;
1100             return $self->dbh->do("UPDATE file_to_replicate SET nexttry = " . $self->unix_timestamp .
1101             " WHERE nexttry > " . $self->unix_timestamp);
1102             }
1104             # takes two arguments, devid and limit, both required. returns an arrayref of fidids.
1105             sub get_fidids_by_device {
1106             my ($self, $devid, $limit) = @_;
1108             my $dbh = $self->dbh;
1109             my $fidids = $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid = ? LIMIT $limit",
1110             undef, $devid);
1111             return $fidids;
1112             }
1114             # takes two arguments, fidid to be above, and optional limit (default
1115             # 1,000). returns up to that that many fidids above the provided
1116             # fidid. returns array of MogileFS::FID objects, sorted by fid ids.
1117             sub get_fids_above_id {
1118             my ($self, $fidid, $limit) = @_;
1119             $limit ||= 1000;
1120             $limit = int($limit);
1122             my @ret;
1123             my $dbh = $self->dbh;
1124             my $sth = $dbh->prepare("SELECT fid, dmid, dkey, length, classid ".
1125             "FROM file ".
1126             "WHERE fid > ? ".
1127             "ORDER BY fid LIMIT $limit");
1128             $sth->execute($fidid);
1129             while (my $row = $sth->fetchrow_hashref) {
1130             push @ret, MogileFS::FID->new_from_db_row($row);
1131             }
1132             return @ret;
1133             }
1135             # creates a new domain, given a domain namespace string. return the dmid on success,
1136             # throw 'dup' on duplicate name.
1137             # override if you want a less racy version.
1138             sub create_domain {
1139             my ($self, $name) = @_;
1140             my $dbh = $self->dbh;
1142             # get the max domain id
1143             my $maxid = $dbh->selectrow_array('SELECT MAX(dmid) FROM domain') || 0;
1144             my $rv = eval {
1145             $dbh->do('INSERT INTO domain (dmid, namespace) VALUES (?, ?)',
1146             undef, $maxid + 1, $name);
1147             };
1148             if ($self->was_duplicate_error) {
1149             throw("dup");
1150             }
1151             return $maxid+1 if $rv;
1152             die "failed to make domain"; # FIXME: the above is racy.
1153             }
1155             sub update_host_property {
1156             my ($self, $hostid, $col, $val) = @_;
1157             $self->conddup(sub {
1158             $self->dbh->do("UPDATE host SET $col=? WHERE hostid=?", undef, $val, $hostid);
1159             });
1160             return 1;
1161             }
1163             # return ne hostid, or throw 'dup' on error.
1164             # NOTE: you need to put them into the initial 'down' state.
1165             sub create_host {
1166             my ($self, $hostname, $ip) = @_;
1167             my $dbh = $self->dbh;
1168             # racy! lazy. no, better: portable! how often does this happen? :)
1169             my $hid = ($dbh->selectrow_array('SELECT MAX(hostid) FROM host') || 0) + 1;
1170             my $rv = $self->conddup(sub {
1171             $dbh->do("INSERT INTO host (hostid, hostname, hostip, status) ".
1172             "VALUES (?, ?, ?, 'down')",
1173             undef, $hid, $hostname, $ip);
1174             });
1175             return $hid if $rv;
1176             die "db failure";
1177             }
1179             # return array of row hashrefs containing columns: (fid, fromdevid,
1180             # failcount, flags, nexttry)
1181             sub files_to_replicate {
1182             my ($self, $limit) = @_;
1183             my $ut = $self->unix_timestamp;
1184             my $to_repl_map = $self->dbh->selectall_hashref(qq{
1185             SELECT fid, fromdevid, failcount, flags, nexttry
1186             FROM file_to_replicate
1187             WHERE nexttry <= $ut
1188             ORDER BY nexttry
1189             LIMIT $limit
1190             }, "fid") or return ();
1191             return values %$to_repl_map;
1192             }
1194             # although it's safe to have multiple tracker hosts and/or processes
1195             # replicating the same file, around, it's inefficient CPU/time-wise,
1196             # and it's also possible they pick different places and waste disk.
1197             # so the replicator asks the store interface when it's about to start
1198             # and when it's done replicating a fidid, so you can do something smart
1199             # and tell it not to.
1200             sub should_begin_replicating_fidid {
1201             my ($self, $fidid) = @_;
1202             warn("Inefficient implementation of should_begin_replicating_fidid() in $self!\n");
1203             1;
1204             }
1206             # called when replicator is done replicating a fid, so you can cleanup
1207             # whatever you did in 'should_begin_replicating_fidid' above.
1208             #
1209             # NOTE: there's a theoretical race condition in the rebalance code,
1210             # where (without locking as provided by
1211             # should_begin_replicating_fidid/note_done_replicating), all copies of
1212             # a file can be deleted by independent replicators doing rebalancing
1213             # in different ways. so you'll probably want to implement some
1214             # locking in this pair of functions.
1215             sub note_done_replicating {
1216             my ($self, $fidid) = @_;
1217             }
1219             sub delete_fid_from_file_to_replicate {
1220             my ($self, $fidid) = @_;
1221             $self->dbh->do("DELETE FROM file_to_replicate WHERE fid=?", undef, $fidid);
1222             }
1224             sub reschedule_file_to_replicate_absolute {
1225             my ($self, $fid, $abstime) = @_;
1226             $self->dbh->do("UPDATE file_to_replicate SET nexttry = ?, failcount = failcount + 1 WHERE fid = ?",
1227             undef, $abstime, $fid);
1228             }
1230             sub reschedule_file_to_replicate_relative {
1231             my ($self, $fid, $in_n_secs) = @_;
1232             $self->dbh->do("UPDATE file_to_replicate SET nexttry = " . $self->unix_timestamp . " + ?, " .
1233             "failcount = failcount + 1 WHERE fid = ?",
1234             undef, $in_n_secs, $fid);
1235             }
1237             # Given a dmid prefix after and limit, return an arrayref of dkey from the file
1238             # table.
1239             sub get_keys_like {
1240             my ($self, $dmid, $prefix, $after, $limit) = @_;
1241             # fix the input... prefix always ends with a % so that it works
1242             # in a LIKE call, and after is either blank or something
1243             $prefix ||= '';
1244             $prefix .= '%';
1245             $after ||= '';
1247             # now select out our keys
1248             return $self->dbh->selectcol_arrayref
1249             ('SELECT dkey FROM file WHERE dmid = ? AND dkey LIKE ? AND dkey > ? ' .
1250             "ORDER BY dkey LIMIT $limit", undef, $dmid, $prefix, $after);
1251             }
1253             # return arrayref of all tempfile rows (themselves also arrayrefs, of [$fidid, $devids])
1254             # that were created $secs_ago seconds ago or older.
1255             sub old_tempfiles {
1256             my ($self, $secs_old) = @_;
1257             return $self->dbh->selectall_arrayref("SELECT fid, devids FROM tempfile " .
1258             "WHERE createtime < " . $self->unix_timestamp . " - $secs_old LIMIT 50");
1259             }
1261             # given an array of MogileFS::DevFID objects, mass-insert them all
1262             # into file_on (ignoring if they're already present)
1263             sub mass_insert_file_on {
1264             my ($self, @devfids) = @_;
1265             return 1 unless @devfids;
1267             if (@devfids > 1 && ! $self->can_insert_multi) {
1268             $self->mass_insert_file_on($_) foreach @devfids;
1269             return 1;
1270             }
1272             my (@qmarks, @binds);
1273             foreach my $df (@devfids) {
1274             my ($fidid, $devid) = ($df->fidid, $df->devid);
1275             Carp::croak("got a false fidid") unless $fidid;
1276             Carp::croak("got a false devid") unless $devid;
1277             push @binds, $fidid, $devid;
1278             push @qmarks, "(?,?)";
1279             }
1281             # TODO: This should possibly be insert_ignore instead
1282             # As if we are adding an extra file_on entry, we do not want to replace the
1283             # exist one. Check REPLACE semantics.
1284             $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES " . join(',', @qmarks), undef, @binds);
1285             return 1;
1286             }
1288             sub set_schema_vesion {
1289             my ($self, $ver) = @_;
1290             $self->set_server_setting("schema_version", int($ver));
1291             }
1293             # returns array of fidids to try and delete again
1294             sub fids_to_delete_again {
1295             my $self = shift;
1296             my $ut = $self->unix_timestamp;
1297             return @{ $self->dbh->selectcol_arrayref(qq{
1298             SELECT fid
1299             FROM file_to_delete_later
1300             WHERE delafter < $ut
1301             LIMIT 500
1302             }) || [] };
1303             }
1305             # return 1 on success. die otherwise.
1306             sub enqueue_fids_to_delete {
1307             my ($self, @fidids) = @_;
1308             # multi-row insert-ignore/replace CAN fail with the insert_ignore emulation sub.
1309             # when the first row causes the duplicate error, and the remaining rows are
1310             # not processed.
1311             if (@fidids > 1 && ! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
1312             $self->enqueue_fids_to_delete($_) foreach @fidids;
1313             return 1;
1314             }
1315             # TODO: convert to prepared statement?
1316             $self->dbh->do($self->ignore_replace . " INTO file_to_delete (fid) VALUES " .
1317             join(",", map { "(" . int($_) . ")" } @fidids))
1318             or die "file_to_delete insert failed";
1319             }
1321             # clears everything from the fsck_log table
1322             # return 1 on success. die otherwise.
1323             sub clear_fsck_log {
1324             my $self = shift;
1325             $self->dbh->do("DELETE FROM fsck_log");
1326             return 1;
1327             }
1329             sub fsck_log_summarize_every { 100 }
1331             sub fsck_log {
1332             my ($self, %opts) = @_;
1333             $self->dbh->do("INSERT INTO fsck_log (utime, fid, evcode, devid) ".
1334             "VALUES (" . $self->unix_timestamp . ",?,?,?)",
1335             undef,
1336             delete $opts{fid},
1337             delete $opts{code},
1338             delete $opts{devid});
1339             croak("Unknown opts") if %opts;
1341             my $logid = $self->dbh->last_insert_id(undef, undef, 'fsck_log', 'logid')
1342             or die "No last_insert_id found for fsck_log table";
1344             # sum-up evcode counts every so often, to make fsck_status faster,
1345             # avoiding a potentially-huge GROUP BY in the future..
1346             my $SUM_EVERY = $self->fsck_log_summarize_every;
1347             # Note: totally disregards locking/races because there's only one
1348             # fsck process running globally (in theory-- there could be 5
1349             # second overlaps on quick stop/starts, so we take some regard for
1350             # races, but not much).
1351             if ($logid % $SUM_EVERY == 0) {
1352             my $start_max_logid = $self->server_setting("fsck_start_maxlogid") || 0;
1353             # both inclusive:
1354             my $min_logid = max($start_max_logid, $logid - $SUM_EVERY) + 1;
1355             my $cts = $self->fsck_evcode_counts(logid_range => [$min_logid, $logid]); # inclusive notation :)
1356             while (my ($evcode, $ct) = each %$cts) {
1357             $self->incr_server_setting("fsck_sum_evcount_$evcode", $ct);
1358             }
1359             }
1361             return 1;
1362             }
1364             sub get_db_unixtime {
1365             my $self = shift;
1366             return $self->dbh->selectrow_array("SELECT " . $self->unix_timestamp);
1367             }
1369             sub max_fidid {
1370             my $self = shift;
1371             return $self->dbh->selectrow_array("SELECT MAX(fid) FROM file");
1372             }
1374             sub max_fsck_logid {
1375             my $self = shift;
1376             return $self->dbh->selectrow_array("SELECT MAX(logid) FROM fsck_log") || 0;
1377             }
1379             # returns array of $row hashrefs, from fsck_log table
1380             sub fsck_log_rows {
1381             my ($self, $after_logid, $limit) = @_;
1382             $limit = int($limit || 100);
1383             $after_logid = int($after_logid || 0);
1385             my @rows;
1386             my $sth = $self->dbh->prepare(qq{
1387             SELECT logid, utime, fid, evcode, devid
1388             FROM fsck_log
1389             WHERE logid > ?
1390             ORDER BY logid
1391             LIMIT $limit
1392             });
1393             $sth->execute($after_logid);
1394             my $row;
1395             push @rows, $row while $row = $sth->fetchrow_hashref;
1396             return @rows;
1397             }
1399             sub fsck_evcode_counts {
1400             my ($self, %opts) = @_;
1401             my $timegte = delete $opts{time_gte};
1402             my $logr = delete $opts{logid_range};
1403             die if %opts;
1405             my $ret = {};
1406             my $sth;
1407             if ($timegte) {
1408             $sth = $self->dbh->prepare(qq{
1409             SELECT evcode, COUNT(*) FROM fsck_log
1410             WHERE utime >= ?
1411             GROUP BY evcode
1412             });
1413             $sth->execute($timegte||0);
1414             }
1415             if ($logr) {
1416             $sth = $self->dbh->prepare(qq{
1417             SELECT evcode, COUNT(*) FROM fsck_log
1418             WHERE logid >= ? AND logid <= ?
1419             GROUP BY evcode
1420             });
1421             $sth->execute($logr->[0], $logr->[1]);
1422             }
1423             while (my ($ev, $ct) = $sth->fetchrow_array) {
1424             $ret->{$ev} = $ct;
1425             }
1426             return $ret;
1427             }
1429             # run before daemonizing. you can die from here if you see something's amiss. or emit
1430             # warnings.
1431             sub pre_daemonize_checks { }
1434             # attempt to grab a lock of lockname, and timeout after timeout seconds.
1435             # returns 1 on success and 0 on timeout. dies if more than one lock is already outstanding.
1436             sub get_lock {
1437             my ($self, $lockname, $timeout) = @_;
1438             die "Lock recursion detected (grabbing $lockname, had $self->{last_lock}). Bailing out." if $self->{lock_depth};
1439             die "get_lock not implemented for $self";
1440             }
1442             # attempt to release a lock of lockname.
1443             # returns 1 on success and 0 if no lock we have has that name.
1444             sub release_lock {
1445             my ($self, $lockname) = @_;
1446             die "release_lock not implemented for $self";
1447             }
1449             # returns up to $limit @fidids which are on provided $devid
1450             sub random_fids_on_device {
1451             my ($self, $devid, $limit) = @_;
1452             $limit = int($limit) || 100;
1454             my $dbh = $self->dbh;
1456             # FIXME: this blows. not random. and good chances these will
1457             # eventually get to point where they're un-rebalanacable, and we
1458             # never move on past the first 5000
1459             my @some_fids = List::Util::shuffle(@{
1460             $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid=? LIMIT 5000",
1461             undef, $devid) || []
1462             });
1464             @some_fids = @some_fids[0..$limit-1] if $limit < @some_fids;
1465             return @some_fids;
1466             }
1468             # return array of { dmid => ..., classid => ..., devcount => ..., count => ... }
1469             sub get_stats_files_per_devcount {
1470             my ($self) = @_;
1471             my $dbh = $self->dbh;
1472             my @ret;
1473             my $sth = $dbh->prepare('SELECT dmid, classid, devcount, COUNT(devcount) AS "count" FROM file GROUP BY 1, 2, 3');
1474             $sth->execute;
1475             while (my $row = $sth->fetchrow_hashref) {
1476             push @ret, $row;
1477             }
1478             return @ret;
1479             }
1481             1;
1483             __END__