File Coverage

blib/lib/Gearman/Driver/Observer.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             package Gearman::Driver::Observer;
2              
3 1     1   1946 use Moose;
  0            
  0            
4             use Net::Telnet::Gearman;
5             use POE;
6              
7             =head1 NAME
8              
9             Gearman::Driver::Observer - Observes Gearman status interface
10              
11             =head1 DESCRIPTION
12              
13             Each n seconds L<Net::Telnet::Gearman> is used to fetch status of
14             free/running/busy workers from the Gearman server. L<Gearman::Driver>
15             decides to fork more workers depending on the queue size and the
16             MinProcesses/MaxProcesses attribute of the job methods.
17              
18             Currently there's no public interface.
19              
20             =cut
21              
22             has 'callback' => (
23             is => 'rw',
24             isa => 'CodeRef',
25             required => 1,
26             );
27              
28             has 'interval' => (
29             is => 'rw',
30             isa => 'Int',
31             required => 1,
32             );
33              
34             has 'server' => (
35             is => 'rw',
36             isa => 'Str',
37             required => 1,
38             );
39              
40             has 'telnet' => (
41             auto_deref => 1,
42             default => sub { [] },
43             is => 'ro',
44             isa => 'ArrayRef[Net::Telnet::Gearman]',
45             );
46              
47             has 'session' => (
48             is => 'ro',
49             isa => 'POE::Session',
50             );
51              
52             sub BUILD {
53             my ($self) = @_;
54              
55             $self->_connect();
56              
57             $self->{session} = POE::Session->create(
58             object_states => [
59             $self => {
60             _start => '_start',
61             fetch_status => '_fetch_status'
62             }
63             ]
64             );
65             }
66              
67             sub _start {
68             $_[KERNEL]->delay( fetch_status => $_[OBJECT]->interval );
69             }
70              
71             sub _connect {
72             my ($self) = @_;
73              
74             $self->{telnet} = [];
75              
76             foreach my $server ( split /,/, $self->server ) {
77             my ( $host, $port ) = split /:/, $server;
78              
79             my $telnet = Net::Telnet::Gearman->new(
80             Host => $host || 'localhost',
81             Port => $port || 4730,
82             );
83              
84             push @{ $self->{telnet} }, $telnet;
85             }
86             }
87              
88             sub _fetch_status {
89             my %data = ();
90             my @error = ();
91              
92             foreach my $telnet ( $_[OBJECT]->telnet ) {
93             eval {
94             my $status = $telnet->status;
95              
96             foreach my $row (@$status) {
97             $data{ $row->name } ||= {
98             name => $row->name,
99             busy => 0,
100             free => 0,
101             queue => 0,
102             running => 0,
103             };
104             $data{ $row->name }{busy} += $row->busy;
105             $data{ $row->name }{free} += $row->free;
106             $data{ $row->name }{queue} += $row->queue;
107             $data{ $row->name }{running} += $row->running;
108             }
109             };
110              
111             # Try to re-open the telnet connection
112             if ($@) {
113             push @error, $@ if $@;
114             eval { $telnet->open };
115             }
116             }
117              
118             $_[OBJECT]->callback->( { data => [ values %data ], error => \@error } );
119              
120             $_[KERNEL]->delay( fetch_status => $_[OBJECT]->interval );
121             }
122              
123             no Moose;
124              
125             __PACKAGE__->meta->make_immutable;
126              
127             =head1 AUTHOR
128              
129             See L<Gearman::Driver>.
130              
131             =head1 COPYRIGHT AND LICENSE
132              
133             See L<Gearman::Driver>.
134              
135             =head1 SEE ALSO
136              
137             =over 4
138              
139             =item * L<Gearman::Driver>
140              
141             =item * L<Gearman::Driver::Adaptor>
142              
143             =item * L<Gearman::Driver::Console>
144              
145             =item * L<Gearman::Driver::Console::Basic>
146              
147             =item * L<Gearman::Driver::Console::Client>
148              
149             =item * L<Gearman::Driver::Job>
150              
151             =item * L<Gearman::Driver::Job::Method>
152              
153             =item * L<Gearman::Driver::Loader>
154              
155             =item * L<Gearman::Driver::Worker>
156              
157             =back
158              
159             =cut
160              
161             1;