File Coverage

blib/lib/AnyEvent/Gearman/Client/Connection.pm
Criterion Covered Total %
statement 41 82 50.0
branch 2 14 14.2
condition n/a
subroutine 11 21 52.3
pod 1 9 11.1
total 55 126 43.6


line stmt bran cond sub pod time code
1             package AnyEvent::Gearman::Client::Connection;
2 6     6   134 use Any::Moose;
  6         14  
  6         64  
3 6     6   5728 use Scalar::Util 'weaken';
  6         15  
  6         739  
4              
5             extends 'AnyEvent::Gearman::Connection';
6              
7 6     6   45 no Any::Moose;
  6         4656  
  6         53  
8              
9             sub add_task {
10 12     12 1 49 my ($self, $task, $on_complete, $on_error, $type) = @_;
11              
12             $self->add_on_ready(
13             sub {
14 11     11   27 push @{ $self->_need_handle }, $task;
  11         90  
15 11         100 $self->handler->push_write( $task->pack_req($type) );
16 11         8670 $on_complete->();
17             },
18 12         309 $on_error,
19             );
20 12         96 weaken($self);
21              
22 12         85 return;
23             }
24              
25             sub process_work { # common handler for WORK_*
26 9     9 0 36 my ($self, $len, $cb) = @_;
27 9         45 my $handle = $self->handler;
28              
29             $handle->unshift_read( line => "\0", sub {
30 9     9   845 my $job_handle = $_[1];
31 9         31 $len -= length($job_handle) + 1;
32              
33             $_[0]->unshift_read( chunk => $len, sub {
34 9         415 $cb->( $job_handle, $_[1] );
35 9         71 });
36 9         406 });
37             }
38              
39             sub process_packet_8 { # JOB_CREATED
40 10     10 0 35 my ($self, $len) = @_;
41              
42 10         40 my $handle = $self->handler;
43              
44             $handle->unshift_read( chunk => $len, sub {
45 10     10   237 my $job_handle = $_[1];
46 10 50       18 my $task = shift @{ $self->_need_handle } or return;
  10         83  
47              
48 10         82 $task->job_handle($job_handle);
49 10         80 $self->_job_handles->{ $job_handle } = $task;
50 10         129 $task->event( 'on_created' );
51 10         139 });
52 10         386 weaken $self;
53             }
54              
55             sub process_packet_12 { # WORK_STATUS
56 0     0 0 0 my ($self, $len) = @_;
57 0         0 my $handle = $self->handler;
58              
59             $handle->unshift_read( line => "\0", sub {
60 0     0   0 my $job_handle = $_[1];
61 0         0 $len -= length($_[1]) + 1;
62              
63             $_[0]->unshift_read( line => "\0", sub {
64 0         0 my $numerator = $_[1];
65 0         0 $len -= length($_[1]) + 1;
66              
67             $_[0]->unshift_read( chunk => $len, sub {
68 0         0 my $denominator = $_[1];
69              
70 0 0       0 my $task = $self->_job_handles->{ $job_handle } or return;
71 0         0 $task->event( on_status => $numerator, $denominator );
72 0         0 });
73 0         0 });
74 0         0 });
75 0         0 weaken $self;
76             }
77              
78             sub process_packet_13 { # WORK_COMPLETE
79 9     9 0 34 my ($self) = @_;
80              
81             push @_, sub {
82 9     9   28 my ($job_handle, $data) = @_;
83              
84 9 50       376 my $task = delete $self->_job_handles->{ $job_handle } or return;
85 9         81 $task->event( on_complete => $data );
86 9         121 };
87 9         47 weaken $self;
88              
89 9         62 goto \&process_work;
90             }
91              
92             sub process_packet_14 { # WORK_FAIL
93 0     0 0   my ($self, $len) = @_;
94 0           my $handle = $self->handler;
95              
96             $handle->unshift_read( chunk => $len, sub {
97 0     0     my $job_handle = $_[1];
98 0 0         my $task = delete $self->_job_handles->{ $job_handle } or return;
99 0           $task->event('on_fail');
100 0           });
101 0           weaken $self;
102             }
103              
104             sub process_packet_25 { # WORK_EXCEPTION
105 0     0 0   my ($self) = @_;
106              
107             push @_, sub {
108 0     0     my ($job_handle, $data) = @_;
109 0 0         my $task = $self->_job_handles->{ $job_handle } or return;
110 0           $task->event( on_exception => $data );
111 0           };
112 0           Scalar::Util::weaken($self);
113              
114 0           goto \&process_work;
115             }
116              
117             sub process_packet_28 { # WORK_DATA
118 0     0 0   my ($self) = @_;
119              
120             push @_, sub {
121 0     0     my ($job_handle, $data) = @_;
122              
123 0 0         my $task = $self->_job_handles->{ $job_handle } or return;
124 0           $task->event( on_data => $data );
125 0           };
126 0           weaken $self;
127              
128 0           goto \&process_work;
129             }
130              
131             sub process_packet_29 { # WORK_WARNING
132 0     0 0   my ($self) = @_;
133              
134             push @_, sub {
135 0     0     my ($job_handle, $data) = @_;
136 0 0         my $task = $self->_job_handles->{ $job_handle } or return;
137              
138 0           $task->event( on_warning => $data );
139 0           };
140 0           weaken $self;
141              
142 0           goto \&process_work;
143             }
144              
145             __PACKAGE__->meta->make_immutable;
146              
147             __END__