File Coverage

blib/lib/MogileFS/Connection/Worker.pm
Criterion Covered Total %
statement 12 63 19.0
branch 0 16 0.0
condition 0 3 0.0
subroutine 4 16 25.0
pod 6 12 50.0
total 22 110 20.0


line stmt bran cond sub pod time code
1             package MogileFS::Connection::Worker;
2             # This class maintains a connection to one of the various classes of
3             # workers.
4              
5 21     21   127 use strict;
  21         38  
  21         468  
6 21     21   84 use Danga::Socket ();
  21         31  
  21         299  
7 21     21   76 use base qw{Danga::Socket};
  21         33  
  21         2540  
8              
9             use fields (
10 21         119 'read_buf',
11             'read_size', # bigger for monitor
12             'job',
13             'pid',
14             'reqid',
15             'last_alive', # unixtime
16             'known_state', # hashref of { "$what-$whatid" => $state }
17             'wants_todo', # count of how many jobs worker wants.
18 21     21   121 );
  21         40  
19              
20             sub new {
21 0     0 1   my MogileFS::Connection::Worker $self = shift;
22 0 0         $self = fields::new($self) unless ref $self;
23 0           $self->SUPER::new( @_ );
24              
25 0           $self->{pid} = 0;
26 0           $self->{reqid} = 0;
27 0           $self->{wants_todo} = {};
28 0           $self->{job} = undef;
29 0           $self->{last_alive} = time();
30 0           $self->{known_state} = {};
31 0           $self->{read_size} = 1024;
32              
33 0           return $self;
34             }
35              
36             sub note_alive {
37 0     0 0   my $self = shift;
38 0           $self->{last_alive} = time();
39             }
40              
41             sub watchdog_check {
42 0     0 0   my MogileFS::Connection::Worker $self = shift;
43              
44 0           my $timeout = $self->worker_class->watchdog_timeout;
45 0           my $time_since_last_alive = time() - $self->{last_alive};
46 0           return $time_since_last_alive < $timeout;
47             }
48              
49             sub event_read {
50 0     0 1   my MogileFS::Connection::Worker $self = shift;
51              
52             # if we read data from it, it's not blocked on something else.
53 0           $self->note_alive;
54              
55 0           my $bref = $self->read($self->{read_size});
56 0 0         return $self->close() unless defined $bref;
57 0           $self->{read_buf} .= $$bref;
58              
59 0           while ($self->{read_buf} =~ s/^(.+?)\r?\n//) {
60 0           my $line = $1;
61 0 0 0       if ($self->job eq 'queryworker' && $line !~ /^(?:\:|error|debug)/) {
62 0           MogileFS::ProcManager->HandleQueryWorkerResponse($self, $line);
63             } else {
64 0           MogileFS::ProcManager->HandleChildRequest($self, $line);
65             }
66             }
67             }
68              
69             sub event_write {
70 0     0 1   my $self = shift;
71 0           my $done = $self->write(undef);
72 0 0         $self->watch_write(0) if $done;
73             }
74              
75             sub job {
76 0     0 0   my MogileFS::Connection::Worker $self = shift;
77 0 0         return $self->{job} unless @_;
78 0           my $j = shift;
79              
80             # monitor may send huge state events (which we send to everyone else)
81 0 0         $self->{read_size} = Mgd::UNIX_RCVBUF_SIZE() if ($j eq 'monitor');
82 0           $self->{job} = $j;
83             }
84              
85             sub wants_todo {
86 0     0 0   my MogileFS::Connection::Worker $self = shift;
87 0           my $type = shift;
88 0 0         return $self->{wants_todo}->{$type}-- unless @_;
89 0           return $self->{wants_todo}->{$type} = shift;
90             }
91              
92             sub worker_class {
93 0     0 0   my MogileFS::Connection::Worker $self = shift;
94 0           return MogileFS::ProcManager->job_to_class($self->{job});
95             }
96              
97             sub pid {
98 0     0 0   my MogileFS::Connection::Worker $self = shift;
99 0 0         return $self->{pid} unless @_;
100 0           return $self->{pid} = shift;
101             }
102              
103 0     0 1   sub event_hup { my $self = shift; $self->close; }
  0            
104 0     0 1   sub event_err { my $self = shift; $self->close; }
  0            
105              
106             sub close {
107             # mark us as being dead
108 0     0 1   my MogileFS::Connection::Worker $self = shift;
109 0           MogileFS::ProcManager->NoteDeadWorkerConn($self);
110 0           $self->SUPER::close(@_);
111             }
112              
113             1;
114              
115             # Local Variables:
116             # mode: perl
117             # c-basic-indent: 4
118             # indent-tabs-mode: nil
119             # End: