File Coverage

blib/lib/MogileFS/Connection/Client.pm
Criterion Covered Total %
statement 12 87 13.7
branch 0 40 0.0
condition 0 3 0.0
subroutine 4 14 28.5
pod 5 7 71.4
total 21 151 13.9


line stmt bran cond sub pod time code
1             # A client is a user connection for sending requests to us. Requests
2             # can either be normal user requests to be sent to a QueryWorker
3             # or management requests that start with a !.
4              
5             package MogileFS::Connection::Client;
6              
7 7     7   72 use strict;
  7         10  
  7         248  
8 7     7   12440 use Danga::Socket ();
  7         213155  
  7         248  
9 7     7   65 use base qw{Danga::Socket};
  7         15  
  7         1592  
10              
11 7     7   45 use fields qw{read_buf};
  7         15  
  7         73  
12              
13             sub new {
14 0     0 1   my $self = shift;
15 0 0         $self = fields::new($self) unless ref $self;
16 0           $self->SUPER::new( @_ );
17 0           $self->watch_read(1);
18 0           return $self;
19             }
20              
21             # Client
22             sub event_read {
23 0     0 1   my MogileFS::Connection::Client $self = shift;
24              
25 0           my $bref = $self->read(1024);
26 0 0         return $self->close unless defined $bref;
27 0           $self->{read_buf} .= $$bref;
28              
29 0           while ($self->{read_buf} =~ s/^(.*?)\r?\n//) {
30 0 0         next unless length $1;
31 0           $self->handle_request($1);
32             }
33             }
34              
35             sub handle_request {
36 0     0 0   my ($self, $line) = @_;
37              
38             # if it's just 'help', 'h', '?', or something, do that
39             #if ((substr($line, 0, 1) eq '?') || ($line eq 'help')) {
40             # MogileFS::ProcManager->SendHelp($_[1]);
41             # return;
42             #}
43              
44 0 0         if ($line =~ /^!(\S+)(?:\s+(.+))?$/) {
45 0           my ($cmd, $args) = ($1, $2);
46 0           return $self->handle_admin_command($cmd, $args);
47             }
48              
49 0           MogileFS::ProcManager->EnqueueCommandRequest($line, $self);
50             }
51              
52             sub handle_admin_command {
53 0     0 0   my ($self, $cmd, $args) = @_;
54              
55 0           my @out;
56 0 0 0       if ($cmd =~ /^stats$/) {
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
57             # print out some stats on the queues
58 0           my $uptime = time() - MogileFS::ProcManager->server_starttime;
59 0           my $ccount = MogileFS::ProcManager->PendingQueryCount;
60 0           my $wcount = MogileFS::ProcManager->BoredQueryWorkerCount;
61 0           my $ipcount = MogileFS::ProcManager->QueriesInProgressCount;
62 0           my $stats = MogileFS::ProcManager->StatsHash;
63 0           push @out, "uptime $uptime",
64             "pending_queries $ccount",
65             "processing_queries $ipcount",
66             "bored_queryworkers $wcount",
67 0           map { "$_ $stats->{$_}" } sort keys %$stats;
68              
69             } elsif ($cmd =~ /^repl/) {
70 0           my $sto = Mgd::get_store();
71             MogileFS::Class->foreach(sub {
72 0     0     my ($cl, $dmid, $clid) = @_;
73 0           my $dmname = $cl->domain->name;
74 0           my $clname = $cl->name;
75 0           foreach my $ct (1..($cl->mindevcount - 1)) {
76 0           my $count = $sto->nfiles_with_dmid_classid_devcount($dmid, $clid, $ct);
77 0           push @out, "$dmname $clname $ct $count";
78             }
79 0           });
80              
81             } elsif ($cmd =~ /^shutdown/) {
82 0           print "User requested shutdown: $args\n";
83 0           kill 15, $$; # kill us, that kills our kids
84              
85             } elsif ($cmd =~ /^jobs/) {
86             # dump out a list of running jobs and pids
87             MogileFS::ProcManager->foreach_job(sub {
88 0     0     my ($job, $ct, $desired, $pidlist) = @_;
89 0           push @out, "$job count $ct";
90 0           push @out, "$job desired $desired";
91 0           push @out, "$job pids " . join(' ', @$pidlist);
92 0           });
93              
94             } elsif ($cmd =~ /^want/) {
95             # !want
96             # set the new desired staffing level for a class
97 0 0         if ($args =~ /^(\d+)\s+(\S+)/) {
98 0           my ($count, $job) = ($1, $2);
99              
100 0 0         $count = 500 if $count > 500;
101              
102             # now make sure it's a real job
103 0 0         if (MogileFS::ProcManager->is_valid_job($job)) {
104 0           MogileFS::ProcManager->request_job_process($job, $count);
105 0           push @out, "Now desiring $count children doing '$job'.";
106             } else {
107 0           my $classes = join(", ", MogileFS::ProcManager->valid_jobs);
108 0           push @out, "ERROR: Invalid class '$job'. Valid classes: $classes";
109             }
110             } else {
111 0           push @out, "ERROR: usage: !want ";
112             }
113              
114             } elsif ($cmd =~ /^to/) {
115             # !to
116             # sends to all children of
117 0 0         if ($args =~ /^(\S+)\s+(.+)/) {
118 0           my $ct = MogileFS::ProcManager->ImmediateSendToChildrenByJob($1, $2);
119 0           push @out, "Message sent to $ct children.";
120              
121             } else {
122 0           push @out, "ERROR: usage: !to ";
123             }
124              
125             } elsif ($cmd =~ /^queue/ || $cmd =~ /^pend/) {
126             MogileFS::ProcManager->foreach_pending_query(sub {
127 0     0     my ($client, $query) = @_;
128 0           push @out, $query;
129 0           });
130              
131             } elsif ($cmd =~ /^watch/) {
132 0 0         if (MogileFS::ProcManager->RemoveErrorWatcher($self)) {
133 0           push @out, "Removed you from watcher list.";
134             } else {
135 0           MogileFS::ProcManager->AddErrorWatcher($self);
136 0           push @out, "Added you to watcher list.";
137             }
138              
139             } elsif ($cmd =~ /^recent/) {
140             # show the most recent N queries
141 0           push @out, MogileFS::ProcManager->RecentQueries;
142              
143             } elsif ($cmd =~ /^version/) {
144             # show the most recent N queries
145 0           push @out, $MogileFS::Server::VERSION;
146              
147             } else {
148 0           MogileFS::ProcManager->SendHelp($self, $args);
149             }
150              
151 0 0         $self->write(join("\r\n", @out) . "\r\n") if @out;
152 0           $self->write(".\r\n");
153 0           return;
154             }
155              
156             # Client
157 0     0 1   sub event_err { my $self = shift; $self->close; }
  0            
158 0     0 1   sub event_hup { my $self = shift; $self->close; }
  0            
159              
160             # just note that we've died
161             sub close {
162             # mark us as being dead
163 0     0 1   my $self = shift;
164 0           MogileFS::ProcManager->NoteDeadClient($self);
165 0           $self->SUPER::close(@_);
166             }
167              
168             1;
169              
170             # Local Variables:
171             # mode: perl
172             # c-basic-indent: 4
173             # indent-tabs-mode: nil
174             # End: