File Coverage

blib/lib/AnyEvent/Gearman/Worker/Connection.pm
Criterion Covered Total %
statement 57 73 78.0
branch 4 8 50.0
condition 2 2 100.0
subroutine 16 19 84.2
pod 8 11 72.7
total 87 113 76.9


line stmt bran cond sub pod time code
1             package AnyEvent::Gearman::Worker::Connection;
2 3     3   17 use Any::Moose;
  3         7  
  3         19  
3 3     3   1221 use Scalar::Util 'weaken';
  3         7  
  3         399  
4             require bytes;
5              
6 3     3   19 use AnyEvent::Gearman::Constants;
  3         6  
  3         508  
7 3     3   2216 use AnyEvent::Gearman::Job;
  3         403  
  3         239  
8              
9             extends 'AnyEvent::Gearman::Connection';
10              
11             has grabbing => (
12             is => 'rw',
13             isa => 'Bool',
14             default => 0,
15             );
16              
17 3     3   22 no Any::Moose;
  3         6  
  3         17  
18              
19             sub request {
20 15     15 1 38 my ($self, $type, $args) = @_;
21 15   100     118 $args ||= '';
22              
23             $self->add_on_ready(
24             sub {
25 13     13   1447 $self->handler->push_write(
26             "\0REQ" . pack('NN', $type, bytes::length($args)) . $args
27             );
28             },
29             sub {
30 0     0   0 warn sprintf 'Failed to send request to "%s": %s', $self->hostspec, $!;
31             },
32 15         298 );
33 15         2263 weaken $self;
34             }
35              
36             sub register_function {
37 3     3 1 20 my ($self, $func_name) = @_;
38              
39 3         22 my $prefix = $self->context->prefix;
40 3 50       13 $func_name = "$prefix\t$func_name" if $prefix;
41              
42 3         17 $self->can_do($func_name);
43 3 100       31 $self->grab_job unless $self->grabbing;
44             }
45              
46             sub unregister_function {
47 0     0 1 0 my ($self, $func_name) = @_;
48              
49 0         0 my $prefix = $self->context->prefix;
50 0 0       0 $func_name = "$prefix\t$func_name" if $prefix;
51              
52 0         0 $self->cant_do($func_name);
53             }
54              
55             sub can_do {
56 3     3 1 24 my ($self, $func_name) = @_;
57 3         30 $self->request(CAN_DO, $func_name);
58             }
59              
60             sub cant_do {
61 0     0 1 0 my ($self, $func_name) = @_;
62 0         0 $self->request(CANT_DO, $func_name);
63             }
64              
65             sub grab_job {
66 7     7 1 22 my $self = shift;
67 7         38 $self->grabbing(1);
68 7         33 $self->request(GRAB_JOB);
69             }
70              
71             sub pre_sleep {
72 2     2 1 5 my $self = shift;
73 2         10 $self->request(PRE_SLEEP);
74             }
75              
76             sub process_packet_6 { # NOOP
77 2     2 0 7 my ($self, $len) = @_;
78 2         8 $self->grab_job;
79             }
80              
81             sub process_packet_10 { # NO_JOB
82 2     2 0 5 my ($self, $len) = @_;
83 2         62 $self->grabbing(0);
84 2         9 $self->pre_sleep;
85             }
86              
87             sub process_packet_11 { # JOB_ASSIGN
88 3     3 0 12 my ($self, $len) = @_;
89 3         14 my $handle = $self->handler;
90              
91 3         29 $self->grabbing(0);
92              
93             $handle->unshift_read( line => "\0", sub {
94 3     3   153 my $job_handle = $_[1];
95 3         20 $len -= bytes::length($job_handle) + 1;
96              
97             $_[0]->unshift_read( line => "\0", sub {
98 3         165 my $function = $_[1];
99 3         16 $len -= bytes::length($function) + 1;
100              
101             $_[0]->unshift_read( chunk => $len, sub {
102 3         896 my $workload = $_[1];
103              
104             my $job = AnyEvent::Gearman::Job->new(
105             $function => $workload,
106             on_complete => sub {
107 3         307 my ($job, $result) = @_;
108 3         20 $self->request(WORK_COMPLETE, "$job_handle\0$result");
109 3         13 $self->grab_job();
110             },
111             on_data => sub {
112 0         0 my ($job, $data) = @_;
113 0         0 $self->request(WORK_DATA, "$job_handle\0$data");
114             },
115             on_fail => sub {
116 0         0 my ($job) = @_;
117 0         0 $self->request(WORK_FAIL, $job_handle);
118 0         0 $self->grab_job();
119             },
120             on_status => sub {
121 0         0 my ($job, $numerator, $denominator) = @_;
122 0         0 $self->request(
123             WORK_STATUS, "$job_handle\0$numerator\0$denominator"
124             );
125             },
126             on_warning => sub {
127 0         0 my ($job, $warning) = @_;
128 0         0 $self->request(WORK_WARNING, "$job_handle\0$warning");
129             },
130 3         235 );
131 3         1869 $self->work( $job );
132 3         46 });
133 3         49 });
134 3         27 });
135 3         411 weaken $self;
136             }
137              
138             sub work {
139 3     3 1 7 my ($self, $job) = @_;
140              
141 3 50       82 my $cb = $self->context->functions->{ $job->function } or return;
142 3         17 $cb->($job);
143             }
144              
145             __PACKAGE__->meta->make_immutable;
146              
147             __END__