File Coverage

blib/lib/Argon/Client.pm
Criterion Covered Total %
statement 93 121 76.8
branch 9 14 64.2
condition 1 5 20.0
subroutine 26 30 86.6
pod 4 8 50.0
total 133 178 74.7


line stmt bran cond sub pod time code
1             package Argon::Client;
2             # ABSTRACT: Client-side connection class for Argon systems
3             $Argon::Client::VERSION = '0.17';
4              
5 2     2   1154 use strict;
  2         6  
  2         58  
6 2     2   10 use warnings;
  2         5  
  2         58  
7 2     2   8 use Carp;
  2         4  
  2         126  
8 2     2   10 use Moose;
  2         4  
  2         16  
9 2     2   12843 use AnyEvent;
  2         6  
  2         57  
10 2     2   602 use AnyEvent::Socket qw(tcp_connect);
  2         11877  
  2         132  
11 2     2   1432 use Data::Dump::Streamer;
  2         95820  
  2         16  
12 2     2   300 use Try::Tiny;
  2         3  
  2         118  
13 2     2   738 use Argon;
  2         33  
  2         63  
14 2     2   496 use Argon::Async;
  2         4  
  2         58  
15 2     2   13 use Argon::Constants qw(:commands :priorities);
  2         4  
  2         336  
16 2     2   15 use Argon::SecureChannel;
  2         3  
  2         53  
17 2     2   13 use Argon::Log;
  2         5  
  2         149  
18 2     2   15 use Argon::Message;
  2         5  
  2         48  
19 2     2   12 use Argon::Types;
  2         5  
  2         44  
20 2     2   16 use Argon::Util qw(K param interval);
  2         5  
  2         2500  
21              
22             with qw(Argon::Encryption);
23              
24              
25             has host => (
26             is => 'ro',
27             isa => 'Str',
28             required => 1,
29             );
30              
31              
32             has port => (
33             is => 'ro',
34             isa => 'Int',
35             required => 1,
36             );
37              
38              
39             has retry => (
40             is => 'ro',
41             isa => 'Bool',
42             default => 0,
43             );
44              
45              
46             has opened => (
47             is => 'rw',
48             isa => 'Ar::Callback',
49             default => sub { sub {} },
50             );
51              
52              
53             has ready => (
54             is => 'rw',
55             isa => 'Ar::Callback',
56             default => sub { sub {} },
57             );
58              
59              
60             has failed => (
61             is => 'rw',
62             isa => 'Ar::Callback',
63             default => sub { sub {} },
64             );
65              
66              
67             has closed => (
68             is => 'rw',
69             isa => 'Ar::Callback',
70             default => sub { sub {} },
71             );
72              
73              
74             has notify => (
75             is => 'rw',
76             isa => 'Ar::Callback',
77             default => sub { sub {} },
78             );
79              
80             has remote => (
81             is => 'rw',
82             isa => 'Maybe[Str]',
83             );
84              
85             has msg => (
86             is => 'rw',
87             isa => 'HashRef',
88             default => sub {{}},
89             traits => ['Hash'],
90             handles => {
91             has_msg => 'exists',
92             get_msg => 'get',
93             add_msg => 'set',
94             del_msg => 'delete',
95             msg_ids => 'keys',
96             msgs => 'values',
97             },
98             );
99              
100             has channel => (
101             is => 'rw',
102             isa => 'Maybe[Argon::SecureChannel]',
103             handles => [qw(send)],
104             );
105              
106             has addr => (
107             is => 'ro',
108             isa => 'Str',
109             lazy => 1,
110             builder => '_build_addr',
111             );
112              
113             sub _build_addr {
114 3     3   9 my $self = shift;
115 3         114 join ':', $self->host, $self->port;
116             }
117              
118              
119             around BUILDARGS => sub {
120             my $orig = shift;
121             my $class = shift;
122             my %args = @_;
123              
124             if (exists $args{channel}) {
125             # Match encryption settings
126             $args{$_} = $args{channel}{$_}
127             foreach grep { exists $args{channel}{$_} }
128             qw(key keyfile cipher token);
129             }
130              
131             $class->$orig(%args);
132             };
133              
134             sub BUILD {
135 5     5 0 21 my ($self, $args) = @_;
136              
137 5 100       179 if ($self->channel) {
138             # Set callbacks
139 3         91 $self->channel->on_msg(K('_notify', $self));
140 3         94 $self->channel->on_err(K('_error', $self));
141 3         105 $self->channel->on_close(K('_close', $self));
142              
143 3 50       97 if ($self->channel->is_ready) {
144 0         0 $self->opened->();
145 0         0 $self->ready->();
146             } else {
147 3         97 $self->channel->on_ready(K('_ready', $self));
148 3         102 $self->opened->();
149             }
150             }
151             else {
152 2         9 $self->connect;
153             }
154             }
155              
156             sub connect {
157 2     2 0 8 my $self = shift;
158 2         58 log_debug 'Connecting to %s', $self->addr;
159 2         240 tcp_connect $self->host, $self->port, K('_connected', $self);
160             }
161              
162             sub _connected {
163 2     2   8 my ($self, $fh) = @_;
164              
165 2 100       11 if ($fh) {
166 1         44 log_debug '[%s] Connection established', $self->addr;
167              
168 1         118 $self->channel(Argon::SecureChannel->new(
169             fh => $fh,
170             key => $self->key,
171             token => $self->token,
172             remote => $self->remote,
173             on_msg => K('_notify', $self),
174             on_ready => K('_ready', $self),
175             on_err => K('_error', $self),
176             on_close => K('_close', $self),
177             ));
178              
179 1         42 $self->opened->();
180             }
181             else {
182 1         30 log_debug '[%s] Connection attempt failed: %s', $self->addr, $!;
183 1         53 $self->cleanup;
184 1         122 $self->failed->($!);
185             }
186             }
187              
188             sub reply_cb {
189 2     2 0 16 my ($self, $msg, $cb, $retry) = @_;
190 2         79 $self->add_msg($msg->id, {
191             orig => $msg,
192             cb => $cb,
193             intvl => interval(1),
194             retry => $retry,
195             });
196             }
197              
198              
199             sub ping {
200 1     1 1 41 my ($self, $cb) = @_;
201 1         24 my $msg = Argon::Message->new(cmd => $PING);
202 1         8 $self->send($msg);
203 1         4 $self->reply_cb($msg, $cb);
204             }
205              
206              
207             sub queue {
208 0     0 1 0 my ($self, $class, $args, $cb) = @_;
209 0         0 my $msg = Argon::Message->new(cmd => $QUEUE, info => [$class, @$args]);
210 0         0 $self->send($msg);
211 0         0 $self->reply_cb($msg, $cb, $self->retry);
212             }
213              
214              
215             sub process {
216 0     0 1 0 Argon::ASSERT_EVAL_ALLOWED;
217 0         0 my ($self, $code_ref, $args, $cb) = @_;
218 0   0     0 $args ||= [];
219              
220 0         0 my $code = Dump($code_ref)
221             ->Purity(1)
222             ->Declare(1)
223             ->Out;
224              
225 0         0 $self->queue('Argon::Task', [$code, $args], $cb);
226             }
227              
228              
229             sub async ($\[&$]\@) {
230 0     0 1 0 my ($self, $code_ref, $args) = @_;
231 0         0 my $cv = AnyEvent->condvar;
232 0         0 $self->process($code_ref, $args, $cv);
233 0         0 tie my $async, 'Argon::Async', $cv;
234 0         0 return $async;
235             }
236              
237             sub cleanup {
238 2     2 0 4 my $self = shift;
239 2         56 $self->closed->();
240 2         101 $self->channel(undef);
241              
242 2         4 my $error = 'Remote host was disconnected before task completed';
243              
244 2         55 foreach my $id ($self->msg_ids) {
245 0         0 my $info = $self->get_msg($id);
246 0 0       0 my $cb = $info->{cb} or next;
247 0         0 my $msg = $info->{orig};
248 0         0 $cb->($msg->error($error));
249             }
250             }
251              
252 4     4   126 sub _ready { shift->ready->() }
253              
254             sub _error {
255 0     0   0 my ($self, $error) = @_;
256 0         0 log_error '[%s] %s', $self->addr, $error;
257 0         0 $self->cleanup;
258             }
259              
260             sub _close {
261 1     1   2 my ($self) = @_;
262 1         28 log_debug '[%s] Remote host disconnected', $self->addr;
263 1         67 $self->cleanup;
264             }
265              
266             sub _notify {
267 4     4   15 my ($self, $msg) = @_;
268              
269 4 100       133 if ($self->has_msg($msg->id)) {
270 2         68 my $info = $self->del_msg($msg->id);
271              
272 2 0 33     13 if ($msg->denied && $info->{retry}) {
273 0         0 my $copy = $info->{orig}->copy;
274 0         0 my $intvl = $info->{intvl}->();
275 0         0 log_debug 'Retrying message in %0.2fs: %s', $intvl, $info->{orig}->explain;
276              
277             $self->add_msg($copy->id, {
278             orig => $copy,
279             cb => $info->{cb},
280             intvl => $info->{intvl},
281 0         0 retry => 1,
282             timer => AnyEvent->timer(after => $intvl, cb => K('send', $self, $copy)),
283             });
284              
285 0         0 return;
286             }
287              
288 2 100       15 if ($info->{cb}) {
289 1         10 $info->{cb}->($msg);
290             }
291             else {
292 1         112 $self->notify->($msg);
293             }
294             }
295             else {
296 2         64 $self->notify->($msg);
297             }
298             }
299              
300             __PACKAGE__->meta->make_immutable;
301              
302             1;
303              
304             __END__
305              
306             =pod
307              
308             =encoding UTF-8
309              
310             =head1 NAME
311              
312             Argon::Client - Client-side connection class for Argon systems
313              
314             =head1 VERSION
315              
316             version 0.17
317              
318             =head1 SYNOPSIS
319              
320             use Argon::Client;
321             use AnyEvent;
322              
323             my $cv = AnyEvent->condvar;
324              
325             my $ar = Argon::Client->new(
326             host => 'some.host.net',
327             port => 1234,
328             retry => 1,
329             opened => $cv,
330             ready => sub{},
331             failed => sub{},
332             closed => sub{},
333             notify => sub{},
334             );
335              
336             $cv->recv;
337              
338             while (my $task = get_next_task) {
339             $ar->process($task->class, $task->args, \&task_complete);
340             }
341              
342             my $result = $ar->async(sub{ ... });
343             if ($result eq 'fnord') {
344             ...
345             }
346              
347             =head1 DESCRIPTION
348              
349             Provides the client connection to an L<Argon> network.
350              
351             =head1 ATTRIBUTES
352              
353             =head2 host
354              
355             The hostname of the L<Argon::Manager> serving as the entry point for the
356             Argon network.
357              
358             =head2 port
359              
360             The port number for the L<Argon::Manager>.
361              
362             =head2 retry
363              
364             By default, when the network is at capacity, new tasks may be rejected, causing
365             L<Argon::Message/result> to croak. If C<retry> is set, the C<Argon::Client>
366             will instead retry the task on a logarithmic backoff timer until the task is
367             accepted by the manager.
368              
369             =head2 opened
370              
371             A code ref that is triggered when the connection is initially opened.
372              
373             =head2 ready
374              
375             A code ref that is triggered when the connection has been opened and the
376             client is ready to begin sending tasks.
377              
378             =head2 failed
379              
380             A code ref that is triggered when the connection fails. The value of C<$!> is
381             passed as an argument.
382              
383             =head2 closed
384              
385             A code ref that is triggered when the connection to the remote host is
386             closed.
387              
388             =head2 notify
389              
390             When tasks are created without a callback (see L<Argon::Client/process>),
391             the C<notify> callback is used in its place. The L<Argon::Message> reply
392             is passed as an argument.
393              
394             =head1 METHODS
395              
396             =head2 ping
397              
398             Pings the L<Argon::Manager> and calls the supplied callback with the manager's
399             reply.
400              
401             $ar->ping(sub{ my $reply = shift; ... });
402              
403             =head2 queue
404              
405             Queues a task with the Ar manager. Accepts the name of a class accessible to
406             the workers defining a C<new> and C<run> method, an array of arguments to be
407             passed to C<new>, and an optional code ref to be called when the task is
408             complete. If not supplied, the L<Argon::Client/notify> method will be called in
409             its place.
410              
411             $ar->queue('Task::Class', $args_list, sub{
412             my $reply = shift;
413             ...
414             });
415              
416             =head2 process
417              
418             If the Ar workers were started with C<--allow-eval> and if the client process
419             itself has C<$Argon::ALLOW_EVAL> set to a true value, a code ref may be passed
420             in place of a task class. The code ref will be serialized using
421             L<Data::Dump::Streamer> and has limited support for closures.
422              
423             $ar->process(sub{ ... }, $args_list, sub{
424             my $reply = shift;
425             ...
426             });
427              
428             =head2 async
429              
430             As an alternative to passing a callback or using a default callback, the
431             C<async> method returns a tied scalar that, when accessed, blocks until the
432             result is available. Note that if the task resulted in an error, it is thrown
433             when the async is fetched.
434              
435             my $async = $ar->async(sub{ ... }, $arg_list);
436              
437             if ($async eq 'slood') {
438             ...
439             }
440              
441             See L<Argon::Async>.
442              
443             =head1 AUTHOR
444              
445             Jeff Ober <sysread@fastmail.fm>
446              
447             =head1 COPYRIGHT AND LICENSE
448              
449             This software is copyright (c) 2017 by Jeff Ober.
450              
451             This is free software; you can redistribute it and/or modify it under
452             the same terms as the Perl 5 programming language system itself.
453              
454             =cut