File Coverage

blib/lib/Broker/Async/Worker.pm
Criterion Covered Total %
statement 34 35 97.1
branch 4 6 66.6
condition 1 3 33.3
subroutine 9 9 100.0
pod 1 3 33.3
total 49 56 87.5


line stmt bran cond sub pod time code
1             package Broker::Async::Worker;
2 7     7   367 use strict;
  7         8  
  7         153  
3 7     7   20 use warnings;
  7         7  
  7         124  
4 7     7   19 use Carp;
  7         7  
  7         406  
5 7     7   24 use Scalar::Util qw( blessed weaken );
  7         7  
  7         964  
6              
7             =head1 NAME
8              
9             Broker::Async::Worker
10              
11             =head1 DESCRIPTION
12              
13             Used by L for tracking the state of asynchronous work.
14              
15             =cut
16              
17             our $VERSION = "0.0.1"; # __VERSION__
18              
19             =head1 ATTRIBUTES
20              
21             =head2 code
22             The code reference used to start the work.
23             This will be invoked with the arguments passed to C.
24              
25             Must return a L subclass.
26              
27             =head2 concurrency
28              
29             The number of concurrent tasks a worker can execute.
30             Do'ing more tasks than this limit is a fatal error.
31              
32             Defaults to 1.
33              
34             =cut
35              
36             use Class::Tiny qw( code ), {
37 17         449 concurrency => sub { 1 },
38 19         166 futures => sub { +{} },
39 19         323 available => sub { shift->concurrency },
40 7     7   2983 };
  7         14918  
  7         55  
41              
42             =head1 METHODS
43              
44             =head2 new
45              
46             my $worker = Broker::Async::Worker->new(
47             code => sub { ... },
48             concurrency => $max,
49             );
50              
51             =head2 available
52              
53             Indicates whether the worker is available to C tasks.
54             It is a fatal error to invoke C when this is false.
55              
56             =head2 do
57              
58             my $future = $worker->do($task);
59              
60             Invokes the code attribute with the given arguments.
61             Returns a future that will be resolved when the work is done.
62              
63             =cut
64              
65             sub active {
66 2015     2015 0 29316 my ($self) = @_;
67 2015         9613 return values %{ $self->futures };
  2015         107829  
68             }
69              
70             sub BUILD {
71 26     26 0 5861 my ($self) = @_;
72 26         28 for my $name (qw( code )) {
73 26 50       426 croak "$name attribute required" unless defined $self->$name;
74             }
75             }
76              
77             sub do {
78 136     136 1 4591 weaken(my $self = shift);
79 136         3765 my (@args) = @_;
80 136 100       38044 if (not( $self->available )) {
81 1         142 croak "worker $self is not available for work";
82             }
83              
84 135         46021 my $f = $self->code->(@args);
85 135 50 33     26117 if (not( blessed($f) and $f->isa('Future') )) {
86 0         0 croak "code for worker $self did not return a Future: returned $f";
87             }
88 135         38232 $self->available( $self->available - 1 );
89              
90             return $self->futures->{$f} = $f->on_ready(sub{
91 130     130   323412 delete $self->futures->{$f};
92 130         45702 $self->available( $self->available + 1 );
93 135         19535 });
94             }
95              
96             =head1 AUTHOR
97              
98             Mark Flickinger Emaf@cpan.orgE
99              
100             =head1 LICENSE
101              
102             This software is licensed under the same terms as Perl itself.
103              
104             =cut
105              
106              
107             1;