File Coverage

lib/IPC/Transit.pm
Criterion Covered Total %
statement 39 41 95.1
branch n/a
condition n/a
subroutine 14 14 100.0
pod n/a
total 53 55 96.3


line stmt bran cond sub pod time code
1             package IPC::Transit;
2             $IPC::Transit::VERSION = '1.161450';
3 16     16   9498 use strict;use warnings;
  16     16   32  
  16         425  
  16         74  
  16         24  
  16         366  
4 16     16   248 use 5.006;
  16         52  
5 16     16   4395 use IPC::Transit::Internal;
  16         41  
  16         517  
6 16     16   8607 use Storable;
  16         38297  
  16         841  
7 16     16   4760 use Data::Dumper;
  16         45584  
  16         832  
8 16     16   7909 use JSON;
  16         95110  
  16         91  
9 16     16   1908 use Sereal::Encoder;
  16         32  
  16         574  
10 16     16   80 use Sereal::Decoder qw(looks_like_sereal decode_sereal);
  16         29  
  16         858  
11 16     16   5871 use Sys::Hostname;
  16         12435  
  16         772  
12 16     16   6369 use HTTP::Lite;
  16         124748  
  16         630  
13 16     16   10712 use File::Temp qw/tempfile/;
  16         221326  
  16         1019  
14 16     16   6853 use Tie::DNS;
  16         1007578  
  16         1813  
15 16     16   4176 use Crypt::Sodium;
  0            
  0            
16             use MIME::Base64;
17              
18             use vars qw(
19             $config_file $config_dir $large_transit_message_dir
20             $local_queues
21             );
22              
23             $IPC::Transit::my_keys = {
24             default => 'ftlMCefNymrF66r2VlFBgHYbWRZqSJPzVg4Vz/I86UQ='
25             };
26             $IPC::Transit::public_keys = {
27             default => 'vbqcxUUGIOvIKzpFWyBbYrSTsmSGj+/zlkF9H3tJ0DI='
28             };
29              
30             our $large_transit_message_dir = '/tmp/transit_large_messages'
31             unless $IPC::Transit::large_transit_message_dir;
32              
33             ##sorry, gotta have this temp dir in a known location
34             mkdir $large_transit_message_dir unless -d $large_transit_message_dir;
35             chmod 0777, $large_transit_message_dir; ##sorry, it has to be 0777 :(
36              
37             our $wire_header_arg_translate = {
38             destination => 'd',
39             destination_qname => 'q',
40             compression => 'c',
41             serializer => 's',
42             message_length => 'l',
43             local_filename => 'f',
44             ttl => 't',
45             nonce => 'n',
46             source => 'S',
47             };
48             our $max_message_size = 4096 unless $IPC::Transit::max_message_size;
49              
50             {
51             my %dns;
52             my $cache;
53             my $ts;
54             sub cached_dns {
55             my $thing = shift;
56             return $thing unless $thing;
57             if(not $ts) {
58             $ts = time;
59             $cache = {};
60             tie %dns, 'Tie::DNS' unless %dns;
61             }
62             if(time > $ts + 10) {
63             $ts = time;
64             $cache = {};
65             }
66             my $ret = eval {
67             local $SIG{ALRM} = sub { die "timed out\n"; };
68             alarm 2;
69             return $dns{$thing};
70             };
71             alarm 0;
72             $ret = $thing unless $ret;
73             return $ret;
74             }
75             }
76              
77             #This validates some allowed values of wire header arguments
78             our $wire_header_args = {
79             s => { #serializer
80             json => 1,
81             sereal => 1,
82             yaml => 1,
83             storable => 1,
84             dumper => 1,
85             },
86             c => { #compression
87             zlib => 1,
88             snappy => 1,
89             none => 1
90             },
91             d => 1, #destination address
92             t => 1, #hop TTL
93             q => 1, #destination qname
94             l => 1, #length of the message itself
95             f => 1, #local_filename, optionally a path on the filesystem where the message can be found
96             t => 1, #Time To Live
97             #for crypto
98             n => 1, #nonce
99             S => 1, #source
100             };
101             our $std_args = {
102             message => 1,
103             qname => 1,
104             nowait => 1,
105             encrypt => 1,
106             };
107              
108             sub send {
109             my %args;
110             { my @args = @_;
111             die 'IPC::Transit::send: even number of arguments required'
112             if scalar @args % 2;
113             %args = @args;
114             }
115             my $qname = $args{qname};
116             die "IPC::Transit::send: parameter 'qname' required"
117             unless $qname;
118             die "IPC::Transit::send: parameter 'qname' must be a scalar"
119             if ref $qname;
120             my $message = $args{message};
121             die "IPC::Transit::send: parameter 'message' required"
122             unless $message;
123             die "IPC::Transit::send: parameter 'message' must be a HASH reference"
124             if ref $message ne 'HASH';
125             $message->{'.ipc_transit_meta'} = {} unless $message->{'.ipc_transit_meta'};
126             $message->{'.ipc_transit_meta'}->{send_ts} = time;
127             if($args{encrypt} and not $args{destination}) {
128             die "IPC::Transit::send: parameter 'destination' must exist if encryption is selected";
129             }
130             if($args{destination}) {
131             #let's take a stab at efficiently getting destination to either an
132             #IP address or a FQDN
133             #If destination has less then tree .'s in it, then we will do a DNS
134             #lookup on it, and if that returns anything, replace it
135             if(not $args{no_dns_normalize}) {
136             if($args{destination} =~ tr/\./\./ < 3) {
137             my $new = cached_dns($args{destination});
138             $args{destination} = $new if $new;
139             }
140             }
141             $args{destination_qname} = $args{qname};
142             $args{qname} = 'transitd';
143             $args{ttl} = '9' unless $args{ttl};
144              
145             return _deliver_non_local($qname, \%args);
146             }
147              
148             #begin the hard work of figuring out if this message should be sent as
149             #local delivery or not.
150             #overall default is to non-local delivery
151              
152             #the overrides in .ipc_transit_meta in the message takes precidence
153             #over previous calls to ::local_queue and/or ::no_local_queue
154              
155             #insides of overrides, the force_local and force_non_local
156             #take precidence over the default_to.
157              
158             #algo
159             #1. absolute override goes to the invocation: override_local/
160             # override_non_local
161             #2. next, look at force_* in the message. If they conflict, then we go
162             # with force_non_local.
163             #3. lacking any instructions there, we go with the default_to directive,
164             # if any, in the message
165             #4. lacking that, we go with what's been set with ::local_queue and/or
166             # ::no_local_queue
167             #5. And finally, non-local delivery
168              
169              
170             #1a:
171             return _deliver_non_local($qname, \%args) if $args{override_local};
172              
173             #1b:
174             return _deliver_local($qname, \%args) if $args{override_non_local};
175              
176             #2a:
177             if( $message->{'.ipc_transit_meta'}->{overrides} and
178             $message->{'.ipc_transit_meta'}->{overrides}->{force_non_local} and
179             $message->{'.ipc_transit_meta'}->{overrides}->{force_non_local}->{$args{qname}}) {
180             return _deliver_non_local($qname, \%args);
181             }
182              
183             #2b:
184             if( $message->{'.ipc_transit_meta'}->{overrides} and
185             $message->{'.ipc_transit_meta'}->{overrides}->{force_local} and
186             $message->{'.ipc_transit_meta'}->{overrides}->{force_local}->{$args{qname}}) {
187             return _deliver_local($qname, \%args);
188             }
189              
190             #3a:
191             if( $message->{'.ipc_transit_meta'}->{overrides} and
192             $message->{'.ipc_transit_meta'}->{overrides}->{default_to} and
193             $message->{'.ipc_transit_meta'}->{overrides}->{default_to} eq 'local'
194             ) {
195             return _deliver_local($qname, \%args);
196             }
197              
198             #3b:
199             if( $message->{'.ipc_transit_meta'}->{overrides} and
200             $message->{'.ipc_transit_meta'}->{overrides}->{default_to} and
201             $message->{'.ipc_transit_meta'}->{overrides}->{default_to} eq 'non-local'
202             ) {
203             return _deliver_non_local($qname, \%args);
204             }
205              
206             #4:
207             if( $local_queues and
208             $local_queues->{$qname}) {
209             return _deliver_local($qname, \%args);
210             }
211              
212             #5:
213             return _deliver_non_local($qname, \%args);
214             }
215              
216             sub _deliver_local {
217             my ($qname, $args) = @_;
218             push @{$local_queues->{$qname}}, $args;
219             return $args;
220             }
221              
222             sub _get_tmp_file {
223             my ($fh, $filename) = tempfile(SUFFIX => '.transit', DIR => $large_transit_message_dir);
224             die 'failed to create tmpfile' unless -e $filename;
225             return ($fh, $filename);
226             }
227              
228              
229             sub _deliver_non_local {
230             my ($qname, $args) = @_;
231             my $to_queue = IPC::Transit::Internal::_initialize_queue(%$args);
232             eval {
233             local $SIG{ALRM} = sub { die "timed out\n"; };
234             alarm 2;
235             pack_message($args);
236             };
237             alarm 0;
238             if($@) {
239             print STDERR "IPC::Transit::_deliver_non_local: pack_message failed: $@\n";
240             unlink $args->{local_filename}
241             if $args->{local_filename} and -e $args->{local_filename};
242             return undef;
243             }
244             my $ret = $to_queue->snd(1,$args->{serialized_wire_data}, IPC::Transit::Internal::_get_flags('nonblock'));
245             unlink $args->{local_filename}
246             if not $ret and $args->{local_filename};
247             return $ret;
248             }
249              
250             sub stats {
251             my $info = IPC::Transit::Internal::_stats();
252             return $info;
253             }
254             sub stat {
255             my %args;
256             { my @args = @_;
257             die 'IPC::Transit::stat: even number of arguments required'
258             if scalar @args % 2;
259             %args = @args;
260             }
261             my $qname = $args{qname};
262             if(not $args{override_local} and $local_queues and $local_queues->{$qname}) {
263             return {
264             qnum => scalar @{$local_queues->{$qname}}
265             };
266             }
267             die "IPC::Transit::stat: parameter 'qname' required"
268             unless $qname;
269             die "IPC::Transit::stat: parameter 'qname' must be a scalar"
270             if ref $qname;
271             my $info = IPC::Transit::Internal::_stat(%args);
272             }
273              
274             sub receive {
275             my %args;
276             { my @args = @_;
277             die 'IPC::Transit::receive: even number of arguments required'
278             if scalar @args % 2;
279             %args = @args;
280             }
281             my $qname = $args{qname};
282              
283             die "IPC::Transit::receive: parameter 'qname' required"
284             unless $qname;
285             die "IPC::Transit::receive: parameter 'qname' must be a scalar"
286             if ref $qname;
287             if( not $args{override_local} and
288             $local_queues and
289             $local_queues->{$qname}) {
290             my $m = shift @{$local_queues->{$qname}};
291             return $m->{message};
292             }
293             my $ret = eval {
294             my $flags = IPC::Transit::Internal::_get_flags('nowait') if $args{nonblock};
295             my $from_queue = IPC::Transit::Internal::_initialize_queue(%args);
296             my $ref = { #just doing this so we can pass the possibly big serialized
297             #data around as a reference
298             serialized_wire_data => '',
299             };
300             if(not $from_queue->rcv($ref->{serialized_wire_data}, 102400000, 0, $flags)) {
301             return undef;
302             }
303             if(not defined $ref->{serialized_wire_data}) {
304             print STDERR "IPC::Transit::receive: received message had no data";
305             return undef;
306             }
307              
308             my ($header_length, $wire_headers) = _parse_wire_header($ref);
309             if(not defined $wire_headers) {
310             print STDERR 'IPC::Transit::receive: received message had no wire headers: ' . substr($ref->{serialized_wire_data}, 0, 30) . "\n";
311             return undef;
312             }
313             if(not defined $header_length) {
314             print STDERR 'IPC::Transit::receive: received message had no header length: ' . substr($ref->{serialized_wire_data}, 0, 30) . "\n";
315             return undef;
316             }
317             sync_serialized_wire_data($wire_headers, $ref);
318              
319             my $message = {
320             wire_headers => $wire_headers,
321             serialized_message => substr(
322             $ref->{serialized_wire_data},
323             $header_length + length($header_length) + 1,
324             9999999, # :(
325             ),
326             };
327             my $used_default_public = 1;
328             if($message->{wire_headers}->{n}) {
329             #we be encrypted
330             #validate $IPC::Transit::my_keys->{private}
331             #
332             my $source = $message->{wire_headers}->{S};
333             my $public_key;
334             if($IPC::Transit::public_keys->{$source}) {
335             $public_key = $IPC::Transit::public_keys->{$source};
336             $used_default_public = 0;
337             } else {
338             $public_key = $IPC::Transit::public_keys->{default};
339             }
340             my @private_keys = ($IPC::Transit::my_keys->{default});
341             push @private_keys, $IPC::Transit::my_keys->{private}
342             if $IPC::Transit::my_keys->{private};
343             my $nonce = decode_base64($message->{wire_headers}->{n});
344             my $public_keys;
345             if(not ref $public_key) {
346             $public_keys = [$public_key];
347             } else {
348             $public_keys = $public_key;
349             }
350             push @$public_keys, $IPC::Transit::public_keys->{default};
351             my $cleartext;
352             PUBLIC:
353             foreach my $public (@$public_keys) {
354             foreach my $private_key (@private_keys) {
355             $cleartext = crypto_box_open(
356             $message->{serialized_message},
357             $nonce,
358             decode_base64($public),
359             decode_base64($private_key),
360             );
361             last PUBLIC if $cleartext;
362             }
363             }
364             $message->{serialized_message} = $cleartext;
365             }
366             return undef unless _thaw($message);
367             $message->{message}->{'.ipc_transit_meta'}->{encrypt_source} =
368             $message->{wire_headers}->{S} if $message->{wire_headers}->{S};
369             $message->{message}->{'.ipc_transit_meta'}->{encrypt_source} = 'default'
370             if $used_default_public;
371             return $message if $args{raw};
372             return $message->{message};
373             };
374             die $@ if $@;
375             return $ret;
376             }
377              
378             sub sync_serialized_wire_data {
379             my ($wire_headers, $ref) = @_;
380             if($wire_headers->{f} and -r $wire_headers->{f}) {
381             eval {
382             local $SIG{ALRM} = sub { die "timed out\n"; };
383             alarm 5;
384             open my $fh, '<', $wire_headers->{f}
385             or die "failed to open $wire_headers->{f} for reading: $!";
386             read $fh, $ref->{serialized_wire_data}, 1024000000
387             or die "failed to read from $wire_headers->{f}: $!";
388             close $fh or die "failed to close $wire_headers->{f}: $!";
389             };
390             alarm 0;
391             unlink $wire_headers->{f};
392             }
393             }
394              
395             sub post_remote {
396             #This is very simple, first-generation logic. It assumes that every
397             #message that is received that has a qname set is destined for off box.
398              
399             #so here, we want to post this message to the destination over http
400             my $message = shift;
401             my $http = HTTP::Lite->new;
402             my $vars = {
403             message => $message->{serialized_wire_data},
404             };
405             $http->prepare_post($vars);
406             my $url = 'http://' . $message->{message}->{'.ipc_transit_meta'}->{destination} . ':9816/message';
407             my $req;
408             eval {
409             $req = $http->request($url)
410             or die "Unable to get document: $!";
411             };
412             print STDERR "IPC::Transit::post_remote: (\$url=$url) failed: $@\n" if $@;
413             return $req;
414             }
415              
416             sub no_local_queue {
417             my %args;
418             { my @args = @_;
419             die 'IPC::Transit::no_local_queue: even number of arguments required'
420             if scalar @args % 2;
421             %args = @args;
422             }
423             my $qname = $args{qname};
424             delete $local_queues->{$qname};
425             return 1;
426             }
427              
428             sub queue_exists {
429             my $qname = shift;
430             return IPC::Transit::Internal::_queue_exists($qname);
431             }
432              
433             sub _parse_wire_header {
434             my $ref = shift;
435             if($ref->{serialized_wire_data} !~ /^(\d+)/sm) {
436             print STDERR 'IPC::Transit::_parse_wire_header: malformed message received: ' . substr($ref->{serialized_wire_data}, 0, 60) . "\n";
437             return (undef, undef);
438             }
439             my $header_length = $1;
440             return (
441             $header_length,
442             deserialize_wire_meta(
443             substr( $ref->{serialized_wire_data},
444             length($header_length) + 1,
445             $header_length
446             )
447             ),
448             );
449             }
450             sub local_queue {
451             my %args;
452             { my @args = @_;
453             die 'IPC::Transit::local_queue: even number of arguments required'
454             if scalar @args % 2;
455             %args = @args;
456             }
457             my $qname = $args{qname};
458             $local_queues = {} unless $local_queues;
459             $local_queues->{$qname} = [] unless $local_queues->{$qname};
460             return 1;
461             }
462              
463             sub pack_message {
464             my $args = shift;
465             $args->{message}->{'.ipc_transit_meta'} = {}
466             unless $args->{message}->{'.ipc_transit_meta'};
467             foreach my $key (keys %$wire_header_arg_translate) {
468             next unless $args->{$key};
469             $args->{$wire_header_arg_translate->{$key}} = $args->{$key};
470             }
471             foreach my $key (keys %$args) {
472             next if $wire_header_args->{$key};
473             next if $std_args->{$key};
474             $args->{message}->{'.ipc_transit_meta'}->{$key} = $args->{$key};
475             }
476             if($args->{encrypt}) {
477             $args->{message}->{'.ipc_transit_meta'}->{destination} = $args->{destination};
478             }
479             $args->{message}->{'.ipc_transit_meta'}->{source_hostname} = _get_my_hostname();
480             if($args->{encrypt}) {
481             my $sender = _get_my_hostname();
482             if(not $sender) {
483             die 'encrypt selected but unable to determine hostname. Set $IPC::Transit::my_hostname to override';
484             }
485             }
486             if($args->{encrypt}) {
487             my $nonce = crypto_box_nonce();
488             $args->{nonce} = encode_base64($nonce);
489              
490             my $my_private_key;
491             if($IPC::Transit::my_keys->{private}) {
492             $my_private_key = $IPC::Transit::my_keys->{private};
493             $args->{message}->{'.ipc_transit_meta'}->{signed_destination} = 'my_private';
494             } else {
495             $my_private_key = $IPC::Transit::my_keys->{default};
496             $args->{message}->{'.ipc_transit_meta'}->{signed_destination} = 'default';
497             }
498             my $their_public_key;
499             if($IPC::Transit::public_keys->{$args->{destination}}) {
500             $their_public_key = $IPC::Transit::public_keys->{$args->{destination}};
501             } else {
502             $their_public_key = $IPC::Transit::public_keys->{default};
503             }
504             $args->{serialized_message} = _freeze($args);
505             my $cipher_text = crypto_box(
506             $args->{serialized_message},
507             $nonce,
508             decode_base64($their_public_key),
509             decode_base64($my_private_key)
510             );
511             $args->{serialized_message} = $cipher_text;
512             $args->{source} = _get_my_hostname();
513             } else {
514             $args->{serialized_message} = _freeze($args);
515             }
516             $args->{message_length} = length $args->{serialized_message};
517             if($args->{message_length} > $IPC::Transit::max_message_size) {
518             my $s;
519             eval {
520             my $fh;
521             ($fh, $args->{local_filename}) = _get_tmp_file();
522             $s = serialize_wire_meta($args);
523             print $fh "$s$args->{serialized_message}"
524             or die "failed to write to file $args->{local_filename}: $!";
525             close $fh or die "failed to close $args->{local_filename}: $!";
526             chmod 0666, $args->{local_filename};
527             };
528             if($@) {
529             unlink $args->{local_filename};
530             die "IPC::Transit::pack_message: failed: $@";
531             }
532             $args->{serialized_wire_data} = $s;
533             return;
534             }
535             my $s = serialize_wire_meta($args);
536             $args->{serialized_wire_data} = "$s$args->{serialized_message}";
537             return;
538             }
539              
540             sub serialize_wire_meta {
541             my $args = shift;
542             my $s = '';
543             foreach my $key (keys %$args) {
544             my $translated_key = $wire_header_arg_translate->{$key};
545             if($translated_key and $wire_header_args->{$translated_key}) {
546             if($wire_header_args->{$translated_key} == 1) {
547             $s = "$s$translated_key=$args->{$key},";
548             } elsif($wire_header_args->{$translated_key}->{$args->{$key}}) {
549             $s = "$s$translated_key=$args->{$key},";
550             } else {
551             die "passed wire argument $translated_key had value of $args->{$translated_key} not of allowed type";
552             }
553             }
554             }
555             chop $s; #no trailing ,
556             my $l = length $s;
557             return "$l:$s";
558             }
559              
560             sub deserialize_wire_meta {
561             my $header = shift;
562             my $ret = {};
563             foreach my $part (split ',', $header) {
564             my ($key, $val) = split '=', $part;
565             $ret->{$key} = $val;
566             }
567             return $ret;
568             }
569              
570             {
571             my $encoder;
572             sub _freeze {
573             my $args = shift;
574             $encoder = Sereal::Encoder->new() unless $encoder;
575             if(not defined $args->{serializer} and $ENV{IPC_TRANSIT_DEFAULT_SERIALIZER}) {
576             $args->{serializer} = $ENV{IPC_TRANSIT_DEFAULT_SERIALIZER};
577             }
578             if(not defined $args->{serializer} or $args->{serializer} eq 'json') {
579             return encode_json $args->{message};
580             } elsif($args->{serializer} eq 'sereal') {
581             return $encoder->encode($args->{message});
582             } elsif($args->{serializer} eq 'dumper') {
583             return Data::Dumper::Dumper $args->{message};
584             } elsif($args->{serializer} eq 'storable') {
585             return Storable::freeze $args->{message};
586             } else {
587             die "_freeze: undefined serializer: $args->{serializer}";
588             }
589             }
590             }
591              
592             sub _thaw {
593             my $args = shift;
594             my $ret = eval {
595             die 'passed serialized_message is falsy'
596             unless $args->{serialized_message};
597             if(not defined $args->{wire_headers}->{s} or $args->{wire_headers}->{s} eq 'sereal') {
598             if(looks_like_sereal($args->{serialized_message})) {
599             return $args->{message} = decode_sereal($args->{serialized_message});
600             } else {
601             return $args->{message} = decode_json($args->{serialized_message});
602             }
603             } elsif($args->{wire_headers}->{s} eq 'json') {
604             return $args->{message} = decode_json($args->{serialized_message});
605             } elsif($args->{wire_headers}->{s} eq 'dumper') {
606             our $VAR1;
607             eval $args->{serialized_message};
608             return $args->{message} = $VAR1;
609             } elsif($args->{wire_headers}->{s} eq 'storable') {
610             return $args->{message} = Storable::thaw($args->{serialized_message});
611             } else {
612             die "undefined serializer: $args->{wire_headers}->{s}";
613             }
614             };
615             my $err = $@;
616             if($err) {
617             if($args->{serialized_message}) {
618             print STDERR "_thaw: failed: $err: $args->{serialized_message}\n";
619             } else {
620             print STDERR "_thaw: failed: $err: <undef>\n";
621             }
622             }
623             return $ret;
624             }
625              
626             sub gen_key_pair {
627             my ($pk, $sk) = box_keypair();
628             return (encode_base64($pk),encode_base64($sk));
629             }
630              
631             {
632             my $hostname;
633             sub _get_my_hostname {
634             return $IPC::Transit::my_hostname if $IPC::Transit::my_hostname;
635             return $hostname if $hostname;
636             { my $ret = `hostname -f 2> /dev/null`;
637             chomp $ret;
638             if(length($ret) > 5) {
639             $hostname = $ret;
640             }
641             }
642             $hostname = hostname unless $hostname;
643             return $hostname;
644             }
645             }
646             1;
647              
648             __END__
649              
650             =head1 NAME
651              
652             IPC::Transit - A framework for high performance message passing
653              
654             =head1 NOTES
655              
656             The serialization is currently hard-coded to https://metacpan.org/pod/Sereal
657              
658             =head1 SYNOPSIS
659              
660             use strict;
661             use IPC::Transit;
662             IPC::Transit::send(qname => 'test', message => { a => 'b' });
663              
664             #...the same or a different process on the same machine
665             my $message = IPC::Transit::receive(qname => 'test');
666              
667             #remote transit
668             remote-transitd & #run 'outgoing' transitd gateway
669             IPC::Transit::send(qname => 'test', message => { a => 'b' }, destination => 'some.other.box.com');
670              
671             #On 'some.other.box.com':
672             plackup --port 9816 $(which remote-transit-gateway.psgi) & #run 'incoming' transitd gateway
673             my $message = IPC::Transit::receive(qname => 'test');
674              
675             =head1 DESCRIPTION
676              
677             This queue framework has the following goals:
678            
679             =over 4
680              
681             =item * Serverless
682              
683             =item * High Throughput
684              
685             =item * Usually Low Latency
686              
687             =item * Relatively Good Reliability
688              
689             =item * CPU and Memory efficient
690              
691             =item * Cross UNIX Implementation
692              
693             =item * Multiple Language Compability
694              
695             =item * Very few module dependencies
696              
697             =item * Supports old version of Perl
698              
699             =item * Feature stack is modular and optional
700              
701             =back
702              
703             This queue framework has the following anti-goals:
704              
705             =over 4
706              
707             =item * Guaranteed Delivery
708              
709             =back
710              
711             =head1 FUNCTIONS
712              
713             =head2 send(qname => 'some_queue', message => $hashref, [destination => $destination, serializer => 'some serializer', crypto => 1 ])
714              
715             This sends $hashref to 'some_queue'. some_queue may be on the local
716             box, or it may be in the same process space as the caller.
717              
718             This call will block until the destination queue has enough space to
719             handle the serialized message.
720              
721             The destination argument is optional. If defined, it is the remote host
722             will receive the message.
723              
724             The serialize argument is optional, and defaults to Sereal. It is
725             over-ridden with the IPC_TRANSIT_DEFAULT_SERIALIZER environmental
726             variable. The following serializers are available:
727              
728             serial, json, yaml, storable, dumper
729              
730             NB: there is no need to define the serialization type in receive. It is
731             automatically detected and utilized.
732              
733             The crypto argument is optional. See below for details.
734              
735             =head2 receive(qname => 'some_queue', nonblock => [0|1], override_local => [0|1])
736              
737             This function fetches a hash reference from 'some_queue' and returns it.
738             By default, it will block until a reference is available. Setting nonblock
739             to a true value will cause this to return immediately with 'undef' is
740             no messages are available.
741              
742             override_local defaults to false; if set to true, the receive will always
743             do a non-process local receive.
744              
745              
746             =head2 stat(qname => 'some_queue')
747              
748             Returns various stats about the passed queue name, per IPC::Msg::stat:
749              
750             print Dumper IPC::Transit::stat(qname => 'test');
751             $VAR1 = {
752             'ctime' => 1335141770,
753             'cuid' => 1000,
754             'lrpid' => 0,
755             'uid' => 1000,
756             'lspid' => 0,
757             'mode' => 438,
758             'qnum' => 0,
759             'cgid' => 1000,
760             'rtime' => 0,
761             'qbytes' => 16384,
762             'stime' => 0,
763             'gid' => 1000
764             }
765              
766             =head2 stats()
767              
768             Return an array of hash references, each containing the information
769             obtained by the stat() call, one entry for each queue on the system.
770              
771             =head2 CRYPTO
772              
773             On send(), if the crypto argument is set, IPC::Transit will sign and
774             encrypt the message before it is sent. The necessary configs, including
775             relevant keys, are set in some global variables.
776              
777             See an actual example of this in action under ex/crypto.pl
778              
779             Please note that this module does not directly assist with the always
780             onerous task of key distribution.
781              
782             =head3 $IPC::Transit::my_hostname
783              
784             If not set, this defaults to the output of the module Sys::Hostname.
785             This value is placed into the message by the sender, and used by the
786             receiver to lookup the public key of the sender.
787              
788             =head3 $IPC::Transit::my_keys
789              
790             This is a hash reference initially populated, in the attribute 'default',
791             with the private half of a default key pair. For actual secure
792             communication, a new key pair must be generated on both sides, and the
793             sender's private key needs to be placed here:
794              
795             $IPC::Transit::my_keys->{private} = $real_private_key
796              
797             =head3 $IPC::Transit::public_keys
798              
799             As above, this is a hash reference initially populated, in the attribute
800             'default', with the public half of a default key pair. For actual secure
801             communication, a new key pair must be generated on both sides, and the
802             receiver's public key needs to be placed here:
803              
804             $IPC::Transit::public_keys->{$receiver_hostname} = $real_public_key_from_receiver
805              
806             $receiver_hostname must exactly match what is passed into the 'destination'
807             field of send().
808              
809             All of these keys must be base 64 encoded 32 byte primes, as used by
810             the Crypto::Sodium package.
811              
812             =head3 IPC::Transit::gen_key_pair()
813              
814             This returns a two element array representing a public/privte key pair,
815             properly base64 encoded for use in $IPC::Transit::my_keys and
816             $IPC::Transit::public_keys
817              
818             =head1 SEE ALSO
819              
820             A zillion other queueing systems.
821              
822             =head1 TODO
823              
824             Implement nonblock flag for send()
825              
826             =head1 BUGS
827              
828             Patches, flames, opinions, enhancement ideas are all welcome.
829              
830             I am not satisfied with not supporting Windows, but it is considered
831             secondary. I am open to the possibility of adding abstractions for this
832             kind of support as long as it doesn't impact the primary goals.
833              
834             =head1 COPYRIGHT
835              
836             Copyright (c) 2012, 2013, 2016 Dana M. Diederich. All Rights Reserved.
837              
838             =head1 LICENSE
839              
840             This module is free software. It may be used, redistributed
841             and/or modified under the terms of the Perl Artistic License
842             (see http://www.perl.com/perl/misc/Artistic.html)
843              
844             =head1 AUTHOR
845              
846             Dana M. Diederich <dana@realms.org>
847              
848             =cut