line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package Message::Passing::ZeroMQ; |
2
|
1
|
|
|
1
|
|
1151
|
use strict; |
|
1
|
|
|
|
|
1
|
|
|
1
|
|
|
|
|
30
|
|
3
|
1
|
|
|
1
|
|
4
|
use warnings; |
|
1
|
|
|
|
|
1
|
|
|
1
|
|
|
|
|
28
|
|
4
|
1
|
|
|
1
|
|
448
|
use POSIX::AtFork (); |
|
1
|
|
|
|
|
610
|
|
|
1
|
|
|
|
|
26
|
|
5
|
1
|
|
|
1
|
|
723
|
use Sub::Name; |
|
1
|
|
|
|
|
748
|
|
|
1
|
|
|
|
|
87
|
|
6
|
1
|
|
|
1
|
|
837
|
use namespace::clean -except => 'meta'; |
|
1
|
|
|
|
|
22129
|
|
|
1
|
|
|
|
|
5
|
|
7
|
|
|
|
|
|
|
|
8
|
|
|
|
|
|
|
our $VERSION = "0.010"; |
9
|
|
|
|
|
|
|
$VERSION = eval $VERSION; |
10
|
|
|
|
|
|
|
|
11
|
|
|
|
|
|
|
our @_WITH_CONTEXTS; |
12
|
|
|
|
|
|
|
|
13
|
|
|
|
|
|
|
POSIX::AtFork->add_to_prepare(subname at_fork => sub { |
14
|
|
|
|
|
|
|
foreach my $thing (grep { defined $_ } @_WITH_CONTEXTS) { |
15
|
|
|
|
|
|
|
$thing->_clear_ctx; |
16
|
|
|
|
|
|
|
} |
17
|
|
|
|
|
|
|
@_WITH_CONTEXTS = (); |
18
|
|
|
|
|
|
|
}); |
19
|
|
|
|
|
|
|
|
20
|
|
|
|
|
|
|
1; |
21
|
|
|
|
|
|
|
|
22
|
|
|
|
|
|
|
=head1 NAME |
23
|
|
|
|
|
|
|
|
24
|
|
|
|
|
|
|
Message::Passing::ZeroMQ - input and output messages to ZeroMQ. |
25
|
|
|
|
|
|
|
|
26
|
|
|
|
|
|
|
=head1 SYNOPSIS |
27
|
|
|
|
|
|
|
|
28
|
|
|
|
|
|
|
# Terminal 1: |
29
|
|
|
|
|
|
|
$ message-passing --input STDIN --output ZeroMQ --output_options '{"connect":"tcp://127.0.0.1:5552"}' |
30
|
|
|
|
|
|
|
{"data":{"some":"data"},"@metadata":"value"} |
31
|
|
|
|
|
|
|
|
32
|
|
|
|
|
|
|
# Terminal 2: |
33
|
|
|
|
|
|
|
$ message-passing --output STDOUT --input ZeroMQ --input_options '{"socket_bind":"tcp://*:5552"}' |
34
|
|
|
|
|
|
|
{"data":{"some":"data"},"@metadata":"value"} |
35
|
|
|
|
|
|
|
|
36
|
|
|
|
|
|
|
=head1 DESCRIPTION |
37
|
|
|
|
|
|
|
|
38
|
|
|
|
|
|
|
A L<ZeroMQ> transport for L<Message::Passing>. |
39
|
|
|
|
|
|
|
|
40
|
|
|
|
|
|
|
Designed for use as a log transport and aggregation mechanism for perl applications, allowing you |
41
|
|
|
|
|
|
|
to aggregate structured and non-structured log messages across the network in a non-blocking manor. |
42
|
|
|
|
|
|
|
|
43
|
|
|
|
|
|
|
Clients (I.e. users of the L<Message::Passing::Output::ZeroMQ> class) connect to a server (I.e. a user of the |
44
|
|
|
|
|
|
|
L<Message::Passing::Input::ZeroMQ> class) via ZeroMQ's pub/sub sockets. These are setup to be lossy and non-blocking, |
45
|
|
|
|
|
|
|
meaning that if the log-receiver process is down or slow, then the application will queue a small (and configurable) |
46
|
|
|
|
|
|
|
amount of logs on it's side, and after that log messages will be dropped. |
47
|
|
|
|
|
|
|
|
48
|
|
|
|
|
|
|
Whilst throwing away log messages isn't a good thing to do, or something that you want to happen regularly, |
49
|
|
|
|
|
|
|
in many (especially web application) contexts, network logging being a single point of failure is |
50
|
|
|
|
|
|
|
not acceptable from a reliability and graceful degradation standpoint. |
51
|
|
|
|
|
|
|
|
52
|
|
|
|
|
|
|
The application grinding to a halt as a non-essential centralised resource is unavailable (e.g. the log aggregation |
53
|
|
|
|
|
|
|
server) is significantly less acceptable than the loss of non-essential logging data. |
54
|
|
|
|
|
|
|
|
55
|
|
|
|
|
|
|
=head1 HOW TO USE |
56
|
|
|
|
|
|
|
|
57
|
|
|
|
|
|
|
In your application emitting messages, you can either use L<Message::Passing::Output::ZeroMQ> directly, |
58
|
|
|
|
|
|
|
or you can use it via L<Log::Dispatch::Message::Passing>. |
59
|
|
|
|
|
|
|
|
60
|
|
|
|
|
|
|
use Log::Dispatch; |
61
|
|
|
|
|
|
|
use Log::Dispatch::Message::Passing; |
62
|
|
|
|
|
|
|
use Message::Passing::Output::ZeroMQ; |
63
|
|
|
|
|
|
|
use Message::Passing::Filter::Encode::JSON; |
64
|
|
|
|
|
|
|
|
65
|
|
|
|
|
|
|
my $log = Log::Dispatch->new; |
66
|
|
|
|
|
|
|
|
67
|
|
|
|
|
|
|
$log->add(Log::Dispatch::Message::Passing->new( |
68
|
|
|
|
|
|
|
name => 'myapp_aggregate_log', |
69
|
|
|
|
|
|
|
min_level => 'debug', |
70
|
|
|
|
|
|
|
output => Message::Passing::Filter::Encode::JSON->new( |
71
|
|
|
|
|
|
|
output_to => Message::Passing::Output::ZeroMQ->new( |
72
|
|
|
|
|
|
|
connect => 'tcp://192.168.0.1:5558', |
73
|
|
|
|
|
|
|
) |
74
|
|
|
|
|
|
|
), |
75
|
|
|
|
|
|
|
)); |
76
|
|
|
|
|
|
|
|
77
|
|
|
|
|
|
|
$log->warn($_) for qw/ foo bar baz /; |
78
|
|
|
|
|
|
|
|
79
|
|
|
|
|
|
|
On your log aggregation server, just run the message-passing utility: |
80
|
|
|
|
|
|
|
|
81
|
|
|
|
|
|
|
message-passing --input ZeroMQ --input_options '{"socket_bind":"tcp://*:5222"}' \ |
82
|
|
|
|
|
|
|
--output File --output_options '{"filename":"/tmp/my_test.log"}' |
83
|
|
|
|
|
|
|
|
84
|
|
|
|
|
|
|
=head1 SOCKET TYPES |
85
|
|
|
|
|
|
|
|
86
|
|
|
|
|
|
|
ZeroMQ supports multiple socket types, the only ones used in Message::Passing::ZeroMQ are: |
87
|
|
|
|
|
|
|
|
88
|
|
|
|
|
|
|
=head2 PUB/SUB |
89
|
|
|
|
|
|
|
|
90
|
|
|
|
|
|
|
Used for general message distribution - you can have either multiple producers (PUB) |
91
|
|
|
|
|
|
|
which connect to one consumer (SUB), or multiple consumers (SUB) which connect to one |
92
|
|
|
|
|
|
|
producer (PUB). |
93
|
|
|
|
|
|
|
|
94
|
|
|
|
|
|
|
All consumers will get a copy of every message. |
95
|
|
|
|
|
|
|
|
96
|
|
|
|
|
|
|
In Message::Passing terms, L<Message::Passing::Input::ZeroMQ> is for SUB sockets, and |
97
|
|
|
|
|
|
|
L<Message::Passing::Output::ZeroMQ> is for PUB sockets. |
98
|
|
|
|
|
|
|
|
99
|
|
|
|
|
|
|
=head2 PUSH/PULL |
100
|
|
|
|
|
|
|
|
101
|
|
|
|
|
|
|
Used for message distribution. A sever (PUSH) distributes messages between |
102
|
|
|
|
|
|
|
a number of connecting clients (PULL) |
103
|
|
|
|
|
|
|
|
104
|
|
|
|
|
|
|
In Message::Passing terms, L<Message::Passing::Input::ZeroMQ> is for PULL sockets, and |
105
|
|
|
|
|
|
|
L<Message::Passing::Output::ZeroMQ> is for PUSH sockets. |
106
|
|
|
|
|
|
|
|
107
|
|
|
|
|
|
|
=head1 CONNECTION DIRECTION |
108
|
|
|
|
|
|
|
|
109
|
|
|
|
|
|
|
Note that in ZeroMQ, the connection direction and the direction of message flow can be |
110
|
|
|
|
|
|
|
entirely opposite. I.e. a client can connect to a server and send messages to it, or |
111
|
|
|
|
|
|
|
receive messages from it (depending on the direction of the socket types). |
112
|
|
|
|
|
|
|
|
113
|
|
|
|
|
|
|
=head1 CONNECTION ATTRIBUTES |
114
|
|
|
|
|
|
|
|
115
|
|
|
|
|
|
|
Both L<Message::Passing::Input::ZeroMQ> and L<Message::Passing::Output::ZeroMQ> support |
116
|
|
|
|
|
|
|
either binding a server or connecting to a remote host, due to the fact that ZeroMQ connections |
117
|
|
|
|
|
|
|
can be in any direction, as noted above. |
118
|
|
|
|
|
|
|
|
119
|
|
|
|
|
|
|
Therefore, each input or output should have one (but not both!) of the following attributes: |
120
|
|
|
|
|
|
|
|
121
|
|
|
|
|
|
|
=head2 connect |
122
|
|
|
|
|
|
|
|
123
|
|
|
|
|
|
|
Connects to a remote server, e.g. C<< tcp://192.168.0.1:5222 >> |
124
|
|
|
|
|
|
|
|
125
|
|
|
|
|
|
|
=head2 socket_bind |
126
|
|
|
|
|
|
|
|
127
|
|
|
|
|
|
|
Binds a server and waits for connections from clients, e.g. C<< tcp://*:5222 >> |
128
|
|
|
|
|
|
|
|
129
|
|
|
|
|
|
|
=head2 socket_type |
130
|
|
|
|
|
|
|
|
131
|
|
|
|
|
|
|
This defaults to C<SUB> for L<Message::Passing::Input::ZeroMQ> and C<PUB> for |
132
|
|
|
|
|
|
|
L<Message::Passing::Output::ZeroMQ>, however you can override it to C<PUSH>/C<PULL> as |
133
|
|
|
|
|
|
|
appropriate for your use case if desired. |
134
|
|
|
|
|
|
|
|
135
|
|
|
|
|
|
|
=head1 MORE COMPLEX EXAMPLES |
136
|
|
|
|
|
|
|
|
137
|
|
|
|
|
|
|
With this in mind, we can easily create a system which aggregates messages from |
138
|
|
|
|
|
|
|
multiple publishers, and passes them out (in a round-robin fashion) to a pool of workers. |
139
|
|
|
|
|
|
|
|
140
|
|
|
|
|
|
|
# The message distributor: |
141
|
|
|
|
|
|
|
message-passing --input ZeroMQ --input_options '{"socket_bind":"tcp://*:5222"}' \ |
142
|
|
|
|
|
|
|
--output ZeroMQ --output_options '{"socket_bind":"tcp://*:5223","socket_type":"PUSH"}' |
143
|
|
|
|
|
|
|
|
144
|
|
|
|
|
|
|
# Workers |
145
|
|
|
|
|
|
|
{ |
146
|
|
|
|
|
|
|
package MyApp::MessageWorker; |
147
|
|
|
|
|
|
|
use Moo; |
148
|
|
|
|
|
|
|
|
149
|
|
|
|
|
|
|
with 'Message::Passing::Role::Filter'; |
150
|
|
|
|
|
|
|
|
151
|
|
|
|
|
|
|
sub filter { |
152
|
|
|
|
|
|
|
my ($self, $message) = @_; |
153
|
|
|
|
|
|
|
# .... process the message in any way you want here |
154
|
|
|
|
|
|
|
return undef; # Do not output the message.. |
155
|
|
|
|
|
|
|
} |
156
|
|
|
|
|
|
|
} |
157
|
|
|
|
|
|
|
|
158
|
|
|
|
|
|
|
message-passing --input ZeroMQ --input_options '{"connect":"tcp://127.0.0.1:5223","socket_type":"PULL"}' |
159
|
|
|
|
|
|
|
--filter '+MyApp::MessageWorker' |
160
|
|
|
|
|
|
|
--output STDOUT |
161
|
|
|
|
|
|
|
|
162
|
|
|
|
|
|
|
You log messages into the distributor as per the above simple example, and you can run multiple worker |
163
|
|
|
|
|
|
|
processes.. |
164
|
|
|
|
|
|
|
|
165
|
|
|
|
|
|
|
Less trivial setups could/would emit messages on error, or maybe re-emit the incoming message after transforming it |
166
|
|
|
|
|
|
|
in some way. |
167
|
|
|
|
|
|
|
|
168
|
|
|
|
|
|
|
=head1 SEE ALSO |
169
|
|
|
|
|
|
|
|
170
|
|
|
|
|
|
|
For more detailed information about ZeroMQ and how it works, please consult the ZeroMQ guide and the other links below: |
171
|
|
|
|
|
|
|
|
172
|
|
|
|
|
|
|
=over |
173
|
|
|
|
|
|
|
|
174
|
|
|
|
|
|
|
=item L<Message::Passing::Output::ZeroMQ> |
175
|
|
|
|
|
|
|
|
176
|
|
|
|
|
|
|
=item L<Message::Passing::Input::ZeroMQ> |
177
|
|
|
|
|
|
|
|
178
|
|
|
|
|
|
|
=item L<Message::Passing> |
179
|
|
|
|
|
|
|
|
180
|
|
|
|
|
|
|
=item L<ZMQ::FFI> |
181
|
|
|
|
|
|
|
|
182
|
|
|
|
|
|
|
=item L<http://www.zeromq.org/> |
183
|
|
|
|
|
|
|
|
184
|
|
|
|
|
|
|
=item L<http://zguide.zeromq.org/page:all> |
185
|
|
|
|
|
|
|
|
186
|
|
|
|
|
|
|
=back |
187
|
|
|
|
|
|
|
|
188
|
|
|
|
|
|
|
=head1 AUTHOR |
189
|
|
|
|
|
|
|
|
190
|
|
|
|
|
|
|
Tomas (t0m) Doran <bobtfish@bobtfish.net> |
191
|
|
|
|
|
|
|
|
192
|
|
|
|
|
|
|
=head1 SPONSORSHIP |
193
|
|
|
|
|
|
|
|
194
|
|
|
|
|
|
|
This module exists due to the wonderful people at Suretec Systems Ltd. |
195
|
|
|
|
|
|
|
<http://www.suretecsystems.com/> who sponsored its development for its |
196
|
|
|
|
|
|
|
VoIP division called SureVoIP <http://www.surevoip.co.uk/> for use with |
197
|
|
|
|
|
|
|
the SureVoIP API - |
198
|
|
|
|
|
|
|
<http://www.surevoip.co.uk/support/wiki/api_documentation> |
199
|
|
|
|
|
|
|
|
200
|
|
|
|
|
|
|
=head1 COPYRIGHT |
201
|
|
|
|
|
|
|
|
202
|
|
|
|
|
|
|
Copyright Suretec Systems 2012. |
203
|
|
|
|
|
|
|
|
204
|
|
|
|
|
|
|
=head1 LICENSE |
205
|
|
|
|
|
|
|
|
206
|
|
|
|
|
|
|
GNU Affero General Public License, Version 3 |
207
|
|
|
|
|
|
|
|
208
|
|
|
|
|
|
|
If you feel this is too restrictive to be able to use this software, |
209
|
|
|
|
|
|
|
please talk to us as we'd be willing to consider re-licensing under |
210
|
|
|
|
|
|
|
less restrictive terms. |
211
|
|
|
|
|
|
|
|
212
|
|
|
|
|
|
|
=cut |
213
|
|
|
|
|
|
|
|
214
|
|
|
|
|
|
|
1; |
215
|
|
|
|
|
|
|
|