File Coverage

blib/lib/Argon.pm
Criterion Covered Total %
statement 13 15 86.6
branch n/a
condition n/a
subroutine 5 5 100.0
pod n/a
total 18 20 90.0


line stmt bran cond sub pod time code
1             package Argon;
2              
3             our $VERSION = '0.16';
4              
5 1     1   443 use strict;
  1         1  
  1         30  
6 1     1   4 use warnings;
  1         1  
  1         22  
7 1     1   3 use Carp;
  1         6  
  1         52  
8 1     1   456 use Const::Fast;
  1         2132  
  1         4  
9 1     1   332 use Coro;
  0            
  0            
10             use Scalar::Util qw(weaken refaddr);
11             use POSIX qw(strftime);
12             use Log::Log4perl qw();
13              
14             if ($^O eq 'MSWin32') {
15             die 'MSWin32 is not supported';
16             }
17              
18             require Exporter;
19             use base qw/Exporter/;
20              
21             our %EXPORT_TAGS = (
22             # Priorities
23             priorities => [qw($PRI_HIGH $PRI_NORMAL $PRI_LOW)],
24              
25             # Command verbs and responses
26             commands => [qw(
27             $CMD_PING $CMD_QUEUE $CMD_COLLECT $CMD_REGISTER $CMD_STATUS
28             $CMD_ACK $CMD_COMPLETE $CMD_ERROR $CMD_REJECTED
29             )],
30              
31             logging => [qw(
32             SET_LOG_LEVEL
33             $TRACE TRACE
34             $DEBUG DEBUG
35             $INFO INFO
36             $WARN WARN
37             $ERROR ERROR
38             $FATAL FATAL
39             )],
40             );
41              
42             our @EXPORT_OK = ('K', map { @$_ } values %EXPORT_TAGS);
43              
44             #-------------------------------------------------------------------------------
45             # Returns a new function suitable for use as a callback. This is useful to pass
46             # instance methods as callbacks without leaking references.
47             #
48             # Inputs:
49             # $fn : CODE reference or function name
50             # $context : class name or object instance
51             # @args : other arguments to pass to $fn
52             #
53             # Output:
54             # CODE reference
55             #
56             # Examples:
57             # # Using a function reference
58             # my $cb = K(\&on_connection);
59             #
60             # # Using an instance method
61             # my $cb = K('on_connection', $client);
62             #
63             # # Using a class method
64             # my $cb = K('on_connection', 'ClientClass');
65             #
66             # # With extra arguments
67             # my $cb = K('on_connection', $client, 'x', 'y', 'z');
68             #-------------------------------------------------------------------------------
69             sub K {
70             my ($fn, $context, @args) = @_;
71              
72             croak "unknown method $fn"
73             if !ref $context
74             || !$context->can($fn);
75              
76             weaken $context;
77             my $k = $context->can($fn);
78              
79             return sub {
80             unshift @_, $context, @args;
81             goto $k;
82             };
83             }
84              
85             #-------------------------------------------------------------------------------
86             # Defaults
87             #-------------------------------------------------------------------------------
88             our $EOL = "\n"; # end of line/message character(s)
89             our $MSG_SEPARATOR = ' '; # separator between parts of a message (command, priority, payload, etc)
90             our $TRACK_MESSAGES = 10; # number of message times to track for computing avg processing time at a host
91             our $POLL_INTERVAL = 5; # number of seconds between polls for connectivity between cluster/node
92             our $CONNECT_TIMEOUT = 5; # number of seconds after which a stream times out attempting to connect
93             our $DEL_COMPLETE_AFTER = 30 * 60; # number of seconds after which a completed task's result is delete if not collected
94              
95             #-------------------------------------------------------------------------------
96             # Priorities
97             #-------------------------------------------------------------------------------
98             const our $PRI_HIGH => Coro::PRIO_HIGH;
99             const our $PRI_NORMAL => Coro::PRIO_NORMAL;
100             const our $PRI_LOW => Coro::PRIO_MIN;
101              
102             #-------------------------------------------------------------------------------
103             # Commands
104             #-------------------------------------------------------------------------------
105             const our $CMD_PING => 0; # Verify that a worker is responding
106             const our $CMD_QUEUE => 1; # Queue a message
107             const our $CMD_COLLECT => 2; # Collect results
108             const our $CMD_REGISTER => 3; # Add a node to a cluster
109             const our $CMD_STATUS => 4; # Get process and system status from a manager
110              
111             const our $CMD_ACK => 5; # Acknowledgement (respond OK)
112             const our $CMD_COMPLETE => 6; # Response - message is complete
113             const our $CMD_ERROR => 7; # Response - error processing message or invalid message format
114             const our $CMD_REJECTED => 8; # Response - no available capacity for handling tasks
115              
116             #-------------------------------------------------------------------------------
117             # Logging
118             #-------------------------------------------------------------------------------
119             const our $TRACE => $Log::Log4perl::TRACE;
120             const our $DEBUG => $Log::Log4perl::DEBUG;
121             const our $INFO => $Log::Log4perl::INFO;
122             const our $WARN => $Log::Log4perl::WARN;
123             const our $ERROR => $Log::Log4perl::ERROR;
124             const our $FATAL => $Log::Log4perl::FATAL;
125              
126             my $LOGGER = Log::Log4perl->get_logger('argon');
127              
128             sub SET_LOG_LEVEL {
129             Log::Log4perl->easy_init($_[0]);
130             }
131              
132             # Strips an error message of line number and file information.
133             sub error {
134             my $msg = shift;
135             $msg =~ s/ at (.+?) line \d+.//gsm;
136             $msg =~ s/eval {...} called$//gsm;
137             $msg =~ s/\s+$//gsm;
138             $msg =~ s/^\s+//gsm;
139             return $msg;
140             }
141              
142             sub LOG {
143             my $lvl = lc shift;
144             my $coro = $Coro::current + 0;
145             my $msg = sprintf('[%s] [%s] => %s', $$, $coro, error(sprintf(shift, @_)));
146             $LOGGER->$lvl($msg);
147             }
148              
149             sub TRACE { LOG(trace => @_) }
150             sub DEBUG { LOG(debug => @_) }
151             sub INFO { LOG(info => @_) }
152             sub WARN { LOG(warn => @_) }
153             sub ERROR { LOG(error => @_) }
154             sub FATAL { LOG(fatal => @_) }
155              
156             SET_LOG_LEVEL $ERROR
157             unless Log::Log4perl->initialized;
158              
159             1;