File Coverage

blib/lib/Argon/Client.pm
Criterion Covered Total %
statement 90 118 76.2
branch 9 14 64.2
condition 1 5 20.0
subroutine 25 29 86.2
pod 4 8 50.0
total 129 174 74.1


line stmt bran cond sub pod time code
1             package Argon::Client;
2             # ABSTRACT: Client-side connection class for Argon systems
3             $Argon::Client::VERSION = '0.18';
4              
5 2     2   1369 use strict;
  2         6  
  2         74  
6 2     2   15 use warnings;
  2         6  
  2         82  
7 2     2   14 use Carp;
  2         5  
  2         142  
8 2     2   13 use Moose;
  2         7  
  2         19  
9 2     2   17572 use AnyEvent;
  2         6  
  2         81  
10 2     2   744 use AnyEvent::Socket qw(tcp_connect);
  2         20363  
  2         197  
11 2     2   1652 use Data::Dump::Streamer;
  2         145453  
  2         25  
12 2     2   1371 use Argon;
  2         9  
  2         85  
13 2     2   677 use Argon::Async;
  2         10  
  2         88  
14 2     2   22 use Argon::Constants qw(:commands :priorities);
  2         7  
  2         519  
15 2     2   20 use Argon::SecureChannel;
  2         6  
  2         68  
16 2     2   19 use Argon::Log;
  2         5  
  2         195  
17 2     2   20 use Argon::Message;
  2         6  
  2         64  
18 2     2   17 use Argon::Types;
  2         7  
  2         72  
19 2     2   16 use Argon::Util qw(K param interval);
  2         7  
  2         4062  
20              
21             with qw(Argon::Encryption);
22              
23              
24             has host => (
25             is => 'ro',
26             isa => 'Str',
27             required => 1,
28             );
29              
30              
31             has port => (
32             is => 'ro',
33             isa => 'Int',
34             required => 1,
35             );
36              
37              
38             has retry => (
39             is => 'ro',
40             isa => 'Bool',
41             default => 0,
42             );
43              
44              
45             has opened => (
46             is => 'rw',
47             isa => 'Ar::Callback',
48             default => sub { sub {} },
49             );
50              
51              
52             has ready => (
53             is => 'rw',
54             isa => 'Ar::Callback',
55             default => sub { sub {} },
56             );
57              
58              
59             has failed => (
60             is => 'rw',
61             isa => 'Ar::Callback',
62             default => sub { sub {} },
63             );
64              
65              
66             has closed => (
67             is => 'rw',
68             isa => 'Ar::Callback',
69             default => sub { sub {} },
70             );
71              
72              
73             has notify => (
74             is => 'rw',
75             isa => 'Ar::Callback',
76             default => sub { sub {} },
77             );
78              
79             has remote => (
80             is => 'rw',
81             isa => 'Maybe[Str]',
82             );
83              
84             has msg => (
85             is => 'rw',
86             isa => 'HashRef',
87             default => sub {{}},
88             traits => ['Hash'],
89             handles => {
90             has_msg => 'exists',
91             get_msg => 'get',
92             add_msg => 'set',
93             del_msg => 'delete',
94             msg_ids => 'keys',
95             msgs => 'values',
96             },
97             );
98              
99             has channel => (
100             is => 'rw',
101             isa => 'Maybe[Argon::SecureChannel]',
102             handles => [qw(send)],
103             );
104              
105             has addr => (
106             is => 'ro',
107             isa => 'Str',
108             lazy => 1,
109             builder => '_build_addr',
110             );
111              
112             sub _build_addr {
113 3     3   10 my $self = shift;
114 3         99 join ':', $self->host, $self->port;
115             }
116              
117              
118             around BUILDARGS => sub {
119             my $orig = shift;
120             my $class = shift;
121             my %args = @_;
122              
123             if (exists $args{channel}) {
124             # Match encryption settings
125             $args{$_} = $args{channel}{$_}
126             foreach grep { exists $args{channel}{$_} }
127             qw(key keyfile cipher token);
128             }
129              
130             $class->$orig(%args);
131             };
132              
133             sub BUILD {
134 5     5 0 19 my ($self, $args) = @_;
135              
136 5 100       234 if ($self->channel) {
137             # Set callbacks
138 3         166 $self->channel->on_msg(K('_notify', $self));
139 3         97 $self->channel->on_err(K('_error', $self));
140 3         101 $self->channel->on_close(K('_close', $self));
141              
142 3 50       108 if ($self->channel->is_ready) {
143 0         0 $self->opened->();
144 0         0 $self->ready->();
145             } else {
146 3         98 $self->channel->on_ready(K('_ready', $self));
147 3         100 $self->opened->();
148             }
149             }
150             else {
151 2         9 $self->connect;
152             }
153             }
154              
155             sub connect {
156 2     2 0 14 my $self = shift;
157 2         54 log_debug 'Connecting to %s', $self->addr;
158 2         234 tcp_connect $self->host, $self->port, K('_connected', $self);
159             }
160              
161             sub _connected {
162 2     2   9 my ($self, $fh) = @_;
163              
164 2 100       17 if ($fh) {
165 1         36 log_debug '[%s] Connection established', $self->addr;
166              
167 1         102 $self->channel(Argon::SecureChannel->new(
168             fh => $fh,
169             key => $self->key,
170             token => $self->token,
171             remote => $self->remote,
172             on_msg => K('_notify', $self),
173             on_ready => K('_ready', $self),
174             on_err => K('_error', $self),
175             on_close => K('_close', $self),
176             ));
177              
178 1         34 $self->opened->();
179             }
180             else {
181 1         40 log_debug '[%s] Connection attempt failed: %s', $self->addr, $!;
182 1         87 $self->cleanup;
183 1         44 $self->failed->($!);
184             }
185             }
186              
187             sub reply_cb {
188 2     2 0 16 my ($self, $msg, $cb, $retry) = @_;
189 2         216 $self->add_msg($msg->id, {
190             orig => $msg,
191             cb => $cb,
192             intvl => interval(1),
193             retry => $retry,
194             });
195             }
196              
197              
198             sub ping {
199 1     1 1 57 my ($self, $cb) = @_;
200 1         36 my $msg = Argon::Message->new(cmd => $PING);
201 1         13 $self->send($msg);
202 1         7 $self->reply_cb($msg, $cb);
203             }
204              
205              
206             sub queue {
207 0     0 1 0 my ($self, $class, $args, $cb) = @_;
208 0         0 my $msg = Argon::Message->new(cmd => $QUEUE, info => [$class, @$args]);
209 0         0 $self->send($msg);
210 0         0 $self->reply_cb($msg, $cb, $self->retry);
211             }
212              
213              
214             sub process {
215 0     0 1 0 Argon::ASSERT_EVAL_ALLOWED;
216 0         0 my ($self, $code_ref, $args, $cb) = @_;
217 0   0     0 $args ||= [];
218              
219 0         0 my $code = Dump($code_ref)
220             ->Purity(1)
221             ->Declare(1)
222             ->Out;
223              
224 0         0 $self->queue('Argon::Task', [$code, $args], $cb);
225             }
226              
227              
228             sub async ($\[&$]\@) {
229 0     0 1 0 my ($self, $code_ref, $args) = @_;
230 0         0 my $cv = AnyEvent->condvar;
231 0         0 $self->process($code_ref, $args, $cv);
232 0         0 tie my $async, 'Argon::Async', $cv;
233 0         0 return $async;
234             }
235              
236             sub cleanup {
237 2     2 0 8 my $self = shift;
238 2         90 $self->closed->();
239 2         106 $self->channel(undef);
240              
241 2         6 my $error = 'Remote host was disconnected before task completed';
242              
243 2         93 foreach my $id ($self->msg_ids) {
244 0         0 my $info = $self->get_msg($id);
245 0 0       0 my $cb = $info->{cb} or next;
246 0         0 my $msg = $info->{orig};
247 0         0 $cb->($msg->error($error));
248             }
249             }
250              
251 4     4   155 sub _ready { shift->ready->() }
252              
253             sub _error {
254 0     0   0 my ($self, $error) = @_;
255 0         0 log_error '[%s] %s', $self->addr, $error;
256 0         0 $self->cleanup;
257             }
258              
259             sub _close {
260 1     1   6 my ($self) = @_;
261 1         40 log_debug '[%s] Remote host disconnected', $self->addr;
262 1         120 $self->cleanup;
263             }
264              
265             sub _notify {
266 4     4   18 my ($self, $msg) = @_;
267              
268 4 100       151 if ($self->has_msg($msg->id)) {
269 2         75 my $info = $self->del_msg($msg->id);
270              
271 2 0 33     14 if ($msg->denied && $info->{retry}) {
272 0         0 my $copy = $info->{orig}->copy;
273 0         0 my $intvl = $info->{intvl}->();
274 0         0 log_debug 'Retrying message in %0.2fs: %s', $intvl, $info->{orig}->explain;
275              
276             $self->add_msg($copy->id, {
277             orig => $copy,
278             cb => $info->{cb},
279             intvl => $info->{intvl},
280 0         0 retry => 1,
281             timer => AnyEvent->timer(after => $intvl, cb => K('send', $self, $copy)),
282             });
283              
284 0         0 return;
285             }
286              
287 2 100       16 if ($info->{cb}) {
288 1         9 $info->{cb}->($msg);
289             }
290             else {
291 1         39 $self->notify->($msg);
292             }
293             }
294             else {
295 2         84 $self->notify->($msg);
296             }
297             }
298              
299             __PACKAGE__->meta->make_immutable;
300              
301             1;
302              
303             __END__
304              
305             =pod
306              
307             =encoding UTF-8
308              
309             =head1 NAME
310              
311             Argon::Client - Client-side connection class for Argon systems
312              
313             =head1 VERSION
314              
315             version 0.18
316              
317             =head1 SYNOPSIS
318              
319             use Argon::Client;
320             use AnyEvent;
321              
322             my $cv = AnyEvent->condvar;
323              
324             my $ar = Argon::Client->new(
325             host => 'some.host.net',
326             port => 1234,
327             retry => 1,
328             opened => $cv,
329             ready => sub{},
330             failed => sub{},
331             closed => sub{},
332             notify => sub{},
333             );
334              
335             $cv->recv;
336              
337             while (my $task = get_next_task) {
338             $ar->process($task->class, $task->args, \&task_complete);
339             }
340              
341             my $result = $ar->async(sub{ ... });
342             if ($result eq 'fnord') {
343             ...
344             }
345              
346             =head1 DESCRIPTION
347              
348             Provides the client connection to an L<Argon> network.
349              
350             =head1 ATTRIBUTES
351              
352             =head2 host
353              
354             The hostname of the L<Argon::Manager> serving as the entry point for the
355             Argon network.
356              
357             =head2 port
358              
359             The port number for the L<Argon::Manager>.
360              
361             =head2 retry
362              
363             By default, when the network is at capacity, new tasks may be rejected, causing
364             L<Argon::Message/result> to croak. If C<retry> is set, the C<Argon::Client>
365             will instead retry the task on a logarithmic backoff timer until the task is
366             accepted by the manager.
367              
368             =head2 opened
369              
370             A code ref that is triggered when the connection is initially opened.
371              
372             =head2 ready
373              
374             A code ref that is triggered when the connection has been opened and the
375             client is ready to begin sending tasks.
376              
377             =head2 failed
378              
379             A code ref that is triggered when the connection fails. The value of C<$!> is
380             passed as an argument.
381              
382             =head2 closed
383              
384             A code ref that is triggered when the connection to the remote host is
385             closed.
386              
387             =head2 notify
388              
389             When tasks are created without a callback (see L<Argon::Client/process>),
390             the C<notify> callback is used in its place. The L<Argon::Message> reply
391             is passed as an argument.
392              
393             =head1 METHODS
394              
395             =head2 ping
396              
397             Pings the L<Argon::Manager> and calls the supplied callback with the manager's
398             reply.
399              
400             $ar->ping(sub{ my $reply = shift; ... });
401              
402             =head2 queue
403              
404             Queues a task with the Ar manager. Accepts the name of a class accessible to
405             the workers defining a C<new> and C<run> method, an array of arguments to be
406             passed to C<new>, and an optional code ref to be called when the task is
407             complete. If not supplied, the L<Argon::Client/notify> method will be called in
408             its place.
409              
410             $ar->queue('Task::Class', $args_list, sub{
411             my $reply = shift;
412             ...
413             });
414              
415             =head2 process
416              
417             If the Ar workers were started with C<--allow-eval> and if the client process
418             itself has C<$Argon::ALLOW_EVAL> set to a true value, a code ref may be passed
419             in place of a task class. The code ref will be serialized using
420             L<Data::Dump::Streamer> and has limited support for closures.
421              
422             $ar->process(sub{ ... }, $args_list, sub{
423             my $reply = shift;
424             ...
425             });
426              
427             =head2 async
428              
429             As an alternative to passing a callback or using a default callback, the
430             C<async> method returns a tied scalar that, when accessed, blocks until the
431             result is available. Note that if the task resulted in an error, it is thrown
432             when the async is fetched.
433              
434             my $async = $ar->async(sub{ ... }, $arg_list);
435              
436             if ($async eq 'slood') {
437             ...
438             }
439              
440             See L<Argon::Async>.
441              
442             =head1 AUTHOR
443              
444             Jeff Ober <sysread@fastmail.fm>
445              
446             =head1 COPYRIGHT AND LICENSE
447              
448             This software is copyright (c) 2017 by Jeff Ober.
449              
450             This is free software; you can redistribute it and/or modify it under
451             the same terms as the Perl 5 programming language system itself.
452              
453             =cut