File Coverage

lib/IPC/Transit.pm
Criterion Covered Total %
statement 39 41 95.1
branch n/a
condition n/a
subroutine 14 14 100.0
pod n/a
total 53 55 96.3


line stmt bran cond sub pod time code
1             package IPC::Transit;
2             $IPC::Transit::VERSION = '1.161400';
3 16     16   6746 use strict;use warnings;
  16     16   15  
  16         355  
  16         45  
  16         14  
  16         302  
4 16     16   228 use 5.006;
  16         37  
5 16     16   3866 use IPC::Transit::Internal;
  16         58  
  16         389  
6 16     16   8545 use Storable;
  16         34645  
  16         740  
7 16     16   4637 use Data::Dumper;
  16         41896  
  16         695  
8 16     16   67 use JSON;
  16         12  
  16         82  
9 16     16   1362 use Sereal::Encoder;
  16         21  
  16         505  
10 16     16   55 use Sereal::Decoder qw(looks_like_sereal decode_sereal);
  16         13  
  16         694  
11 16     16   6019 use Sys::Hostname;
  16         11575  
  16         672  
12 16     16   7062 use HTTP::Lite;
  16         114537  
  16         484  
13 16     16   10262 use File::Temp qw/tempfile/;
  16         197071  
  16         1096  
14 16     16   5966 use Tie::DNS;
  16         777906  
  16         531  
15 16     16   2753 use Crypt::Sodium;
  0            
  0            
16             use MIME::Base64;
17              
18             use vars qw(
19             $config_file $config_dir $large_transit_message_dir
20             $local_queues
21             );
22              
23             $IPC::Transit::my_keys = {
24             default => 'ftlMCefNymrF66r2VlFBgHYbWRZqSJPzVg4Vz/I86UQ='
25             };
26             $IPC::Transit::public_keys = {
27             default => 'vbqcxUUGIOvIKzpFWyBbYrSTsmSGj+/zlkF9H3tJ0DI='
28             };
29              
30             our $large_transit_message_dir = '/tmp/transit_large_messages'
31             unless $IPC::Transit::large_transit_message_dir;
32              
33             ##sorry, gotta have this temp dir in a known location
34             mkdir $large_transit_message_dir unless -d $large_transit_message_dir;
35             chmod 0777, $large_transit_message_dir; ##sorry, it has to be 0777 :(
36              
37             our $wire_header_arg_translate = {
38             destination => 'd',
39             destination_qname => 'q',
40             compression => 'c',
41             serializer => 's',
42             message_length => 'l',
43             local_filename => 'f',
44             ttl => 't',
45             nonce => 'n',
46             source => 'S',
47             };
48             our $max_message_size = 4096 unless $IPC::Transit::max_message_size;
49              
50             {
51             my %dns;
52             my $cache;
53             my $ts;
54             sub cached_dns {
55             my $thing = shift;
56             return $thing unless $thing;
57             if(not $ts) {
58             $ts = time;
59             $cache = {};
60             tie %dns, 'Tie::DNS' unless %dns;
61             }
62             if(time > $ts + 10) {
63             $ts = time;
64             $cache = {};
65             }
66             my $ret = eval {
67             local $SIG{ALRM} = sub { die "timed out\n"; };
68             alarm 2;
69             return $dns{$thing};
70             };
71             alarm 0;
72             $ret = $thing unless $ret;
73             return $ret;
74             }
75             }
76              
77             #This validates some allowed values of wire header arguments
78             our $wire_header_args = {
79             s => { #serializer
80             json => 1,
81             sereal => 1,
82             yaml => 1,
83             storable => 1,
84             dumper => 1,
85             },
86             c => { #compression
87             zlib => 1,
88             snappy => 1,
89             none => 1
90             },
91             d => 1, #destination address
92             t => 1, #hop TTL
93             q => 1, #destination qname
94             l => 1, #length of the message itself
95             f => 1, #local_filename, optionally a path on the filesystem where the message can be found
96             t => 1, #Time To Live
97             #for crypto
98             n => 1, #nonce
99             S => 1, #source
100             };
101             our $std_args = {
102             message => 1,
103             qname => 1,
104             nowait => 1,
105             encrypt => 1,
106             };
107              
108             sub send {
109             my %args;
110             { my @args = @_;
111             die 'IPC::Transit::send: even number of arguments required'
112             if scalar @args % 2;
113             %args = @args;
114             }
115             my $qname = $args{qname};
116             die "IPC::Transit::send: parameter 'qname' required"
117             unless $qname;
118             die "IPC::Transit::send: parameter 'qname' must be a scalar"
119             if ref $qname;
120             my $message = $args{message};
121             die "IPC::Transit::send: parameter 'message' required"
122             unless $message;
123             die "IPC::Transit::send: parameter 'message' must be a HASH reference"
124             if ref $message ne 'HASH';
125             $message->{'.ipc_transit_meta'} = {} unless $message->{'.ipc_transit_meta'};
126             $message->{'.ipc_transit_meta'}->{send_ts} = time;
127             if($args{encrypt} and not $args{destination}) {
128             die "IPC::Transit::send: parameter 'destination' must exist if encryption is selected";
129             }
130             if($args{destination}) {
131             #let's take a stab at efficiently getting destination to either an
132             #IP address or a FQDN
133             #If destination has less then tree .'s in it, then we will do a DNS
134             #lookup on it, and if that returns anything, replace it
135             if(not $args{no_dns_normalize}) {
136             if($args{destination} =~ tr/\./\./ < 3) {
137             my $new = cached_dns($args{destination});
138             $args{destination} = $new if $new;
139             }
140             }
141             $args{destination_qname} = $args{qname};
142             $args{qname} = 'transitd';
143             $args{ttl} = '9' unless $args{ttl};
144              
145             return _deliver_non_local($qname, \%args);
146             }
147              
148             #begin the hard work of figuring out if this message should be sent as
149             #local delivery or not.
150             #overall default is to non-local delivery
151              
152             #the overrides in .ipc_transit_meta in the message takes precidence
153             #over previous calls to ::local_queue and/or ::no_local_queue
154              
155             #insides of overrides, the force_local and force_non_local
156             #take precidence over the default_to.
157              
158             #algo
159             #1. absolute override goes to the invocation: override_local/
160             # override_non_local
161             #2. next, look at force_* in the message. If they conflict, then we go
162             # with force_non_local.
163             #3. lacking any instructions there, we go with the default_to directive,
164             # if any, in the message
165             #4. lacking that, we go with what's been set with ::local_queue and/or
166             # ::no_local_queue
167             #5. And finally, non-local delivery
168              
169              
170             #1a:
171             return _deliver_non_local($qname, \%args) if $args{override_local};
172              
173             #1b:
174             return _deliver_local($qname, \%args) if $args{override_non_local};
175              
176             #2a:
177             if( $message->{'.ipc_transit_meta'}->{overrides} and
178             $message->{'.ipc_transit_meta'}->{overrides}->{force_non_local} and
179             $message->{'.ipc_transit_meta'}->{overrides}->{force_non_local}->{$args{qname}}) {
180             return _deliver_non_local($qname, \%args);
181             }
182              
183             #2b:
184             if( $message->{'.ipc_transit_meta'}->{overrides} and
185             $message->{'.ipc_transit_meta'}->{overrides}->{force_local} and
186             $message->{'.ipc_transit_meta'}->{overrides}->{force_local}->{$args{qname}}) {
187             return _deliver_local($qname, \%args);
188             }
189              
190             #3a:
191             if( $message->{'.ipc_transit_meta'}->{overrides} and
192             $message->{'.ipc_transit_meta'}->{overrides}->{default_to} and
193             $message->{'.ipc_transit_meta'}->{overrides}->{default_to} eq 'local'
194             ) {
195             return _deliver_local($qname, \%args);
196             }
197              
198             #3b:
199             if( $message->{'.ipc_transit_meta'}->{overrides} and
200             $message->{'.ipc_transit_meta'}->{overrides}->{default_to} and
201             $message->{'.ipc_transit_meta'}->{overrides}->{default_to} eq 'non-local'
202             ) {
203             return _deliver_non_local($qname, \%args);
204             }
205              
206             #4:
207             if( $local_queues and
208             $local_queues->{$qname}) {
209             return _deliver_local($qname, \%args);
210             }
211              
212             #5:
213             return _deliver_non_local($qname, \%args);
214             }
215              
216             sub _deliver_local {
217             my ($qname, $args) = @_;
218             push @{$local_queues->{$qname}}, $args;
219             return $args;
220             }
221              
222             sub _get_tmp_file {
223             my ($fh, $filename) = tempfile(SUFFIX => '.transit', DIR => $large_transit_message_dir);
224             die 'failed to create tmpfile' unless -e $filename;
225             return ($fh, $filename);
226             }
227              
228              
229             sub _deliver_non_local {
230             my ($qname, $args) = @_;
231             my $to_queue = IPC::Transit::Internal::_initialize_queue(%$args);
232             eval {
233             local $SIG{ALRM} = sub { die "timed out\n"; };
234             alarm 2;
235             pack_message($args);
236             };
237             alarm 0;
238             if($@) {
239             print STDERR "IPC::Transit::_deliver_non_local: pack_message failed: $@\n";
240             unlink $args->{local_filename}
241             if $args->{local_filename} and -e $args->{local_filename};
242             return undef;
243             }
244             my $ret = $to_queue->snd(1,$args->{serialized_wire_data}, IPC::Transit::Internal::_get_flags('nonblock'));
245             unlink $args->{local_filename}
246             if not $ret and $args->{local_filename};
247             return $ret;
248             }
249              
250             sub stats {
251             my $info = IPC::Transit::Internal::_stats();
252             return $info;
253             }
254             sub stat {
255             my %args;
256             { my @args = @_;
257             die 'IPC::Transit::stat: even number of arguments required'
258             if scalar @args % 2;
259             %args = @args;
260             }
261             my $qname = $args{qname};
262             if(not $args{override_local} and $local_queues and $local_queues->{$qname}) {
263             return {
264             qnum => scalar @{$local_queues->{$qname}}
265             };
266             }
267             die "IPC::Transit::stat: parameter 'qname' required"
268             unless $qname;
269             die "IPC::Transit::stat: parameter 'qname' must be a scalar"
270             if ref $qname;
271             my $info = IPC::Transit::Internal::_stat(%args);
272             }
273              
274             sub receive {
275             my %args;
276             { my @args = @_;
277             die 'IPC::Transit::receive: even number of arguments required'
278             if scalar @args % 2;
279             %args = @args;
280             }
281             my $qname = $args{qname};
282              
283             die "IPC::Transit::receive: parameter 'qname' required"
284             unless $qname;
285             die "IPC::Transit::receive: parameter 'qname' must be a scalar"
286             if ref $qname;
287             if( not $args{override_local} and
288             $local_queues and
289             $local_queues->{$qname}) {
290             my $m = shift @{$local_queues->{$qname}};
291             return $m->{message};
292             }
293             my $ret = eval {
294             my $flags = IPC::Transit::Internal::_get_flags('nowait') if $args{nonblock};
295             my $from_queue = IPC::Transit::Internal::_initialize_queue(%args);
296             my $ref = { #just doing this so we can pass the possibly big serialized
297             #data around as a reference
298             serialized_wire_data => '',
299             };
300             if(not $from_queue->rcv($ref->{serialized_wire_data}, 102400000, 0, $flags)) {
301             return undef;
302             }
303             if(not defined $ref->{serialized_wire_data}) {
304             print STDERR "IPC::Transit::receive: received message had no data";
305             return undef;
306             }
307              
308             my ($header_length, $wire_headers) = _parse_wire_header($ref);
309             if(not defined $wire_headers) {
310             print STDERR 'IPC::Transit::receive: received message had no wire headers: ' . substr($ref->{serialized_wire_data}, 0, 30) . "\n";
311             return undef;
312             }
313             if(not defined $header_length) {
314             print STDERR 'IPC::Transit::receive: received message had no header length: ' . substr($ref->{serialized_wire_data}, 0, 30) . "\n";
315             return undef;
316             }
317             sync_serialized_wire_data($wire_headers, $ref);
318              
319             my $message = {
320             wire_headers => $wire_headers,
321             serialized_message => substr(
322             $ref->{serialized_wire_data},
323             $header_length + length($header_length) + 1,
324             9999999, # :(
325             ),
326             };
327             my $used_default_public = 1;
328             if($message->{wire_headers}->{n}) {
329             #we be encrypted
330             #validate $IPC::Transit::my_keys->{private}
331             #
332             my $source = $message->{wire_headers}->{S};
333             my $public_key;
334             if($IPC::Transit::public_keys->{$source}) {
335             $public_key = $IPC::Transit::public_keys->{$source};
336             $used_default_public = 0;
337             } else {
338             $public_key = $IPC::Transit::public_keys->{default};
339             }
340             my $private_key = $IPC::Transit::my_keys->{private};
341             $private_key = $IPC::Transit::my_keys->{default}
342             unless $private_key;
343             my $nonce = decode_base64($message->{wire_headers}->{n});
344             my $public_keys;
345             if(not ref $public_key) {
346             $public_keys = [$public_key];
347             } else {
348             $public_keys = $public_key;
349             }
350             my $cleartext;
351             foreach my $public (@$public_keys) {
352             $cleartext = crypto_box_open(
353             $message->{serialized_message},
354             $nonce,
355             decode_base64($public),
356             decode_base64($private_key),
357             );
358             last if $cleartext;
359             }
360             $message->{serialized_message} = $cleartext;
361             }
362             return undef unless _thaw($message);
363             $message->{message}->{'.ipc_transit_meta'}->{encrypt_source} =
364             $message->{wire_headers}->{S} if $message->{wire_headers}->{S};
365             $message->{message}->{'.ipc_transit_meta'}->{encrypt_source} = 'default'
366             if $used_default_public;
367             return $message if $args{raw};
368             return $message->{message};
369             };
370             die $@ if $@;
371             return $ret;
372             }
373              
374             sub sync_serialized_wire_data {
375             my ($wire_headers, $ref) = @_;
376             if($wire_headers->{f} and -r $wire_headers->{f}) {
377             eval {
378             local $SIG{ALRM} = sub { die "timed out\n"; };
379             alarm 5;
380             open my $fh, '<', $wire_headers->{f}
381             or die "failed to open $wire_headers->{f} for reading: $!";
382             read $fh, $ref->{serialized_wire_data}, 1024000000
383             or die "failed to read from $wire_headers->{f}: $!";
384             close $fh or die "failed to close $wire_headers->{f}: $!";
385             };
386             alarm 0;
387             unlink $wire_headers->{f};
388             }
389             }
390              
391             sub post_remote {
392             #This is very simple, first-generation logic. It assumes that every
393             #message that is received that has a qname set is destined for off box.
394              
395             #so here, we want to post this message to the destination over http
396             my $message = shift;
397             my $http = HTTP::Lite->new;
398             my $vars = {
399             message => $message->{serialized_wire_data},
400             };
401             $http->prepare_post($vars);
402             my $url = 'http://' . $message->{message}->{'.ipc_transit_meta'}->{destination} . ':9816/message';
403             my $req;
404             eval {
405             $req = $http->request($url)
406             or die "Unable to get document: $!";
407             };
408             print STDERR "IPC::Transit::post_remote: (\$url=$url) failed: $@\n" if $@;
409             return $req;
410             }
411              
412             sub no_local_queue {
413             my %args;
414             { my @args = @_;
415             die 'IPC::Transit::no_local_queue: even number of arguments required'
416             if scalar @args % 2;
417             %args = @args;
418             }
419             my $qname = $args{qname};
420             delete $local_queues->{$qname};
421             return 1;
422             }
423              
424             sub queue_exists {
425             my $qname = shift;
426             return IPC::Transit::Internal::_queue_exists($qname);
427             }
428              
429             sub _parse_wire_header {
430             my $ref = shift;
431             if($ref->{serialized_wire_data} !~ /^(\d+)/sm) {
432             print STDERR 'IPC::Transit::_parse_wire_header: malformed message received: ' . substr($ref->{serialized_wire_data}, 0, 60) . "\n";
433             return (undef, undef);
434             }
435             my $header_length = $1;
436             return (
437             $header_length,
438             deserialize_wire_meta(
439             substr( $ref->{serialized_wire_data},
440             length($header_length) + 1,
441             $header_length
442             )
443             ),
444             );
445             }
446             sub local_queue {
447             my %args;
448             { my @args = @_;
449             die 'IPC::Transit::local_queue: even number of arguments required'
450             if scalar @args % 2;
451             %args = @args;
452             }
453             my $qname = $args{qname};
454             $local_queues = {} unless $local_queues;
455             $local_queues->{$qname} = [] unless $local_queues->{$qname};
456             return 1;
457             }
458              
459             sub pack_message {
460             my $args = shift;
461             $args->{message}->{'.ipc_transit_meta'} = {}
462             unless $args->{message}->{'.ipc_transit_meta'};
463             foreach my $key (keys %$wire_header_arg_translate) {
464             next unless $args->{$key};
465             $args->{$wire_header_arg_translate->{$key}} = $args->{$key};
466             }
467             foreach my $key (keys %$args) {
468             next if $wire_header_args->{$key};
469             next if $std_args->{$key};
470             $args->{message}->{'.ipc_transit_meta'}->{$key} = $args->{$key};
471             }
472             if($args->{encrypt}) {
473             $args->{message}->{'.ipc_transit_meta'}->{destination} = $args->{destination};
474             }
475             if($args->{encrypt}) {
476             my $sender = _get_my_hostname();
477             if(not $sender) {
478             die 'encrypt selected but unable to determine hostname. Set $IPC::Transit::my_hostname to override';
479             }
480             }
481             if($args->{encrypt}) {
482             my $nonce = crypto_box_nonce();
483             $args->{nonce} = encode_base64($nonce);
484              
485             my $my_private_key;
486             if($IPC::Transit::my_keys->{private}) {
487             $my_private_key = $IPC::Transit::my_keys->{private};
488             $args->{message}->{'.ipc_transit_meta'}->{signed_destination} = 'my_private';
489             } else {
490             $my_private_key = $IPC::Transit::my_keys->{default};
491             $args->{message}->{'.ipc_transit_meta'}->{signed_destination} = 'default';
492             }
493             my $their_public_key;
494             if($IPC::Transit::public_keys->{$args->{destination}}) {
495             $their_public_key = $IPC::Transit::public_keys->{$args->{destination}};
496             } else {
497             $their_public_key = $IPC::Transit::public_keys->{default};
498             }
499             $args->{serialized_message} = _freeze($args);
500             my $cipher_text = crypto_box(
501             $args->{serialized_message},
502             $nonce,
503             decode_base64($their_public_key),
504             decode_base64($my_private_key)
505             );
506             $args->{serialized_message} = $cipher_text;
507             $args->{source} = _get_my_hostname();
508             } else {
509             $args->{serialized_message} = _freeze($args);
510             }
511             $args->{message_length} = length $args->{serialized_message};
512             if($args->{message_length} > $IPC::Transit::max_message_size) {
513             my $s;
514             eval {
515             my $fh;
516             ($fh, $args->{local_filename}) = _get_tmp_file();
517             $s = serialize_wire_meta($args);
518             print $fh "$s$args->{serialized_message}"
519             or die "failed to write to file $args->{local_filename}: $!";
520             close $fh or die "failed to close $args->{local_filename}: $!";
521             chmod 0666, $args->{local_filename};
522             };
523             if($@) {
524             unlink $args->{local_filename};
525             die "IPC::Transit::pack_message: failed: $@";
526             }
527             $args->{serialized_wire_data} = $s;
528             return;
529             }
530             my $s = serialize_wire_meta($args);
531             $args->{serialized_wire_data} = "$s$args->{serialized_message}";
532             return;
533             }
534              
535             sub serialize_wire_meta {
536             my $args = shift;
537             my $s = '';
538             foreach my $key (keys %$args) {
539             my $translated_key = $wire_header_arg_translate->{$key};
540             if($translated_key and $wire_header_args->{$translated_key}) {
541             if($wire_header_args->{$translated_key} == 1) {
542             $s = "$s$translated_key=$args->{$key},";
543             } elsif($wire_header_args->{$translated_key}->{$args->{$key}}) {
544             $s = "$s$translated_key=$args->{$key},";
545             } else {
546             die "passed wire argument $translated_key had value of $args->{$translated_key} not of allowed type";
547             }
548             }
549             }
550             chop $s; #no trailing ,
551             my $l = length $s;
552             return "$l:$s";
553             }
554              
555             sub deserialize_wire_meta {
556             my $header = shift;
557             my $ret = {};
558             foreach my $part (split ',', $header) {
559             my ($key, $val) = split '=', $part;
560             $ret->{$key} = $val;
561             }
562             return $ret;
563             }
564              
565             {
566             my $encoder;
567             sub _freeze {
568             my $args = shift;
569             $encoder = Sereal::Encoder->new() unless $encoder;
570             if(not defined $args->{serializer} and $ENV{IPC_TRANSIT_DEFAULT_SERIALIZER}) {
571             $args->{serializer} = $ENV{IPC_TRANSIT_DEFAULT_SERIALIZER};
572             }
573             if(not defined $args->{serializer} or $args->{serializer} eq 'json') {
574             return encode_json $args->{message};
575             } elsif($args->{serializer} eq 'sereal') {
576             return $encoder->encode($args->{message});
577             } elsif($args->{serializer} eq 'dumper') {
578             return Data::Dumper::Dumper $args->{message};
579             } elsif($args->{serializer} eq 'storable') {
580             return Storable::freeze $args->{message};
581             } else {
582             die "_freeze: undefined serializer: $args->{serializer}";
583             }
584             }
585             }
586              
587             sub _thaw {
588             my $args = shift;
589             my $ret = eval {
590             die 'passed serialized_message is falsy'
591             unless $args->{serialized_message};
592             if(not defined $args->{wire_headers}->{s} or $args->{wire_headers}->{s} eq 'sereal') {
593             if(looks_like_sereal($args->{serialized_message})) {
594             return $args->{message} = decode_sereal($args->{serialized_message});
595             } else {
596             return $args->{message} = decode_json($args->{serialized_message});
597             }
598             } elsif($args->{wire_headers}->{s} eq 'json') {
599             return $args->{message} = decode_json($args->{serialized_message});
600             } elsif($args->{wire_headers}->{s} eq 'dumper') {
601             our $VAR1;
602             eval $args->{serialized_message};
603             return $args->{message} = $VAR1;
604             } elsif($args->{wire_headers}->{s} eq 'storable') {
605             return $args->{message} = Storable::thaw($args->{serialized_message});
606             } else {
607             die "undefined serializer: $args->{wire_headers}->{s}";
608             }
609             };
610             my $err = $@;
611             if($err) {
612             if($args->{serialized_message}) {
613             print STDERR "_thaw: failed: $err: $args->{serialized_message}\n";
614             } else {
615             print STDERR "_thaw: failed: $err: <undef>\n";
616             }
617             }
618             return $ret;
619             }
620              
621             sub gen_key_pair {
622             my ($pk, $sk) = box_keypair();
623             return (encode_base64($pk),encode_base64($sk));
624             }
625              
626             {
627             my $hostname;
628             sub _get_my_hostname {
629             return $IPC::Transit::my_hostname if $IPC::Transit::my_hostname;
630             return $hostname if $hostname;
631             $hostname = hostname;
632             return $hostname;
633             }
634             }
635             1;
636              
637             __END__
638              
639             =head1 NAME
640              
641             IPC::Transit - A framework for high performance message passing
642              
643             =head1 NOTES
644              
645             The serialization is currently hard-coded to https://metacpan.org/pod/Sereal
646              
647             =head1 SYNOPSIS
648              
649             use strict;
650             use IPC::Transit;
651             IPC::Transit::send(qname => 'test', message => { a => 'b' });
652              
653             #...the same or a different process on the same machine
654             my $message = IPC::Transit::receive(qname => 'test');
655              
656             #remote transit
657             remote-transitd & #run 'outgoing' transitd gateway
658             IPC::Transit::send(qname => 'test', message => { a => 'b' }, destination => 'some.other.box.com');
659              
660             #On 'some.other.box.com':
661             plackup --port 9816 $(which remote-transit-gateway.psgi) & #run 'incoming' transitd gateway
662             my $message = IPC::Transit::receive(qname => 'test');
663              
664             =head1 DESCRIPTION
665              
666             This queue framework has the following goals:
667            
668             =over 4
669              
670             =item * Serverless
671              
672             =item * High Throughput
673              
674             =item * Usually Low Latency
675              
676             =item * Relatively Good Reliability
677              
678             =item * CPU and Memory efficient
679              
680             =item * Cross UNIX Implementation
681              
682             =item * Multiple Language Compability
683              
684             =item * Very few module dependencies
685              
686             =item * Supports old version of Perl
687              
688             =item * Feature stack is modular and optional
689              
690             =back
691              
692             This queue framework has the following anti-goals:
693              
694             =over 4
695              
696             =item * Guaranteed Delivery
697              
698             =back
699              
700             =head1 FUNCTIONS
701              
702             =head2 send(qname => 'some_queue', message => $hashref, [destination => $destination, serializer => 'some serializer', crypto => 1 ])
703              
704             This sends $hashref to 'some_queue'. some_queue may be on the local
705             box, or it may be in the same process space as the caller.
706              
707             This call will block until the destination queue has enough space to
708             handle the serialized message.
709              
710             The destination argument is optional. If defined, it is the remote host
711             will receive the message.
712              
713             The serialize argument is optional, and defaults to Sereal. It is
714             over-ridden with the IPC_TRANSIT_DEFAULT_SERIALIZER environmental
715             variable. The following serializers are available:
716              
717             serial, json, yaml, storable, dumper
718              
719             NB: there is no need to define the serialization type in receive. It is
720             automatically detected and utilized.
721              
722             The crypto argument is optional. See below for details.
723              
724             =head2 receive(qname => 'some_queue', nonblock => [0|1], override_local => [0|1])
725              
726             This function fetches a hash reference from 'some_queue' and returns it.
727             By default, it will block until a reference is available. Setting nonblock
728             to a true value will cause this to return immediately with 'undef' is
729             no messages are available.
730              
731             override_local defaults to false; if set to true, the receive will always
732             do a non-process local receive.
733              
734              
735             =head2 stat(qname => 'some_queue')
736              
737             Returns various stats about the passed queue name, per IPC::Msg::stat:
738              
739             print Dumper IPC::Transit::stat(qname => 'test');
740             $VAR1 = {
741             'ctime' => 1335141770,
742             'cuid' => 1000,
743             'lrpid' => 0,
744             'uid' => 1000,
745             'lspid' => 0,
746             'mode' => 438,
747             'qnum' => 0,
748             'cgid' => 1000,
749             'rtime' => 0,
750             'qbytes' => 16384,
751             'stime' => 0,
752             'gid' => 1000
753             }
754              
755             =head2 stats()
756              
757             Return an array of hash references, each containing the information
758             obtained by the stat() call, one entry for each queue on the system.
759              
760             =head2 CRYPTO
761              
762             On send(), if the crypto argument is set, IPC::Transit will sign and
763             encrypt the message before it is sent. The necessary configs, including
764             relevant keys, are set in some global variables.
765              
766             See an actual example of this in action under ex/crypto.pl
767              
768             Please note that this module does not directly assist with the always
769             onerous task of key distribution.
770              
771             =head3 $IPC::Transit::my_hostname
772              
773             If not set, this defaults to the output of the module Sys::Hostname.
774             This value is placed into the message by the sender, and used by the
775             receiver to lookup the public key of the sender.
776              
777             =head3 $IPC::Transit::my_keys
778              
779             This is a hash reference initially populated, in the attribute 'default',
780             with the private half of a default key pair. For actual secure
781             communication, a new key pair must be generated on both sides, and the
782             sender's private key needs to be placed here:
783              
784             $IPC::Transit::my_keys->{private} = $real_private_key
785              
786             =head3 $IPC::Transit::public_keys
787              
788             As above, this is a hash reference initially populated, in the attribute
789             'default', with the public half of a default key pair. For actual secure
790             communication, a new key pair must be generated on both sides, and the
791             receiver's public key needs to be placed here:
792              
793             $IPC::Transit::public_keys->{$receiver_hostname} = $real_public_key_from_receiver
794              
795             $receiver_hostname must exactly match what is passed into the 'destination'
796             field of send().
797              
798             All of these keys must be base 64 encoded 32 byte primes, as used by
799             the Crypto::Sodium package.
800              
801             =head3 IPC::Transit::gen_key_pair()
802              
803             This returns a two element array representing a public/privte key pair,
804             properly base64 encoded for use in $IPC::Transit::my_keys and
805             $IPC::Transit::public_keys
806              
807             =head1 SEE ALSO
808              
809             A zillion other queueing systems.
810              
811             =head1 TODO
812              
813             Implement nonblock flag for send()
814              
815             =head1 BUGS
816              
817             Patches, flames, opinions, enhancement ideas are all welcome.
818              
819             I am not satisfied with not supporting Windows, but it is considered
820             secondary. I am open to the possibility of adding abstractions for this
821             kind of support as long as it doesn't impact the primary goals.
822              
823             =head1 COPYRIGHT
824              
825             Copyright (c) 2012, 2013, 2016 Dana M. Diederich. All Rights Reserved.
826              
827             =head1 LICENSE
828              
829             This module is free software. It may be used, redistributed
830             and/or modified under the terms of the Perl Artistic License
831             (see http://www.perl.com/perl/misc/Artistic.html)
832              
833             =head1 AUTHOR
834              
835             Dana M. Diederich <dana@realms.org>
836              
837             =cut