line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package Message::Passing::ZeroMQ; |
2
|
1
|
|
|
1
|
|
1185
|
use strict; |
|
1
|
|
|
|
|
3
|
|
|
1
|
|
|
|
|
52
|
|
3
|
1
|
|
|
1
|
|
7
|
use warnings; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
44
|
|
4
|
1
|
|
|
1
|
|
1852
|
use ZeroMQ qw/ :all /; |
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
5
|
|
|
|
|
|
|
use POSIX::AtFork (); |
6
|
|
|
|
|
|
|
use Sub::Name; |
7
|
|
|
|
|
|
|
use namespace::clean -except => 'meta'; |
8
|
|
|
|
|
|
|
|
9
|
|
|
|
|
|
|
our $VERSION = "0.007"; |
10
|
|
|
|
|
|
|
$VERSION = eval $VERSION; |
11
|
|
|
|
|
|
|
|
12
|
|
|
|
|
|
|
our @_WITH_CONTEXTS; |
13
|
|
|
|
|
|
|
|
14
|
|
|
|
|
|
|
POSIX::AtFork->add_to_prepare(subname at_fork => sub { |
15
|
|
|
|
|
|
|
foreach my $thing (grep { defined $_ } @_WITH_CONTEXTS) { |
16
|
|
|
|
|
|
|
$thing->_clear_ctx; |
17
|
|
|
|
|
|
|
} |
18
|
|
|
|
|
|
|
@_WITH_CONTEXTS = (); |
19
|
|
|
|
|
|
|
}); |
20
|
|
|
|
|
|
|
|
21
|
|
|
|
|
|
|
1; |
22
|
|
|
|
|
|
|
|
23
|
|
|
|
|
|
|
=head1 NAME |
24
|
|
|
|
|
|
|
|
25
|
|
|
|
|
|
|
Message::Passing::ZeroMQ - input and output messages to ZeroMQ. |
26
|
|
|
|
|
|
|
|
27
|
|
|
|
|
|
|
=head1 SYNOPSIS |
28
|
|
|
|
|
|
|
|
29
|
|
|
|
|
|
|
# Terminal 1: |
30
|
|
|
|
|
|
|
$ message-passing --input STDIN --output ZeroMQ --output_options '{"connect":"tcp://127.0.0.1:5552"}' |
31
|
|
|
|
|
|
|
{"data":{"some":"data"},"@metadata":"value"} |
32
|
|
|
|
|
|
|
|
33
|
|
|
|
|
|
|
# Terminal 2: |
34
|
|
|
|
|
|
|
$ message-passing --output STDOUT --input ZeroMQ --input_options '{"socket_bind":"tcp://*:5552"}' |
35
|
|
|
|
|
|
|
{"data":{"some":"data"},"@metadata":"value"} |
36
|
|
|
|
|
|
|
|
37
|
|
|
|
|
|
|
=head1 DESCRIPTION |
38
|
|
|
|
|
|
|
|
39
|
|
|
|
|
|
|
A L transport for L. |
40
|
|
|
|
|
|
|
|
41
|
|
|
|
|
|
|
Designed for use as a log transport and aggregation mechanism for perl applications, allowing you |
42
|
|
|
|
|
|
|
to aggregate structured and non-structured log messages across the network in a non-blocking manor. |
43
|
|
|
|
|
|
|
|
44
|
|
|
|
|
|
|
Clients (I.e. users of the L class) connect to a server (I.e. a user of the |
45
|
|
|
|
|
|
|
L class) via ZeroMQ's pub/sub sockets. These are setup to be lossy and non-blocking, |
46
|
|
|
|
|
|
|
meaning that if the log-receiver process is down or slow, then the application will queue a small (and configurable) |
47
|
|
|
|
|
|
|
amount of logs on it's side, and after that log messages will be dropped. |
48
|
|
|
|
|
|
|
|
49
|
|
|
|
|
|
|
Whilst throwing away log messages isn't a good thing to do, or something that you want to happen regularly, |
50
|
|
|
|
|
|
|
in many (especially web application) contexts, network logging being a single point of failure is |
51
|
|
|
|
|
|
|
not acceptable from a reliability and graceful degradation standpoint. |
52
|
|
|
|
|
|
|
|
53
|
|
|
|
|
|
|
The application grinding to a halt as a non-essential centralised resource is unavailable (e.g. the log aggregation |
54
|
|
|
|
|
|
|
server) is significantly less acceptable than the loss of non-essential logging data. |
55
|
|
|
|
|
|
|
|
56
|
|
|
|
|
|
|
=head1 HOW TO USE |
57
|
|
|
|
|
|
|
|
58
|
|
|
|
|
|
|
In your application emitting messages, you can either use L directly, |
59
|
|
|
|
|
|
|
or you can use it via L. |
60
|
|
|
|
|
|
|
|
61
|
|
|
|
|
|
|
use Log::Dispatch; |
62
|
|
|
|
|
|
|
use Log::Dispatch::Message::Passing; |
63
|
|
|
|
|
|
|
use Message::Passing::Output::ZeroMQ; |
64
|
|
|
|
|
|
|
use Message::Passing::Filter::Encode::JSON; |
65
|
|
|
|
|
|
|
|
66
|
|
|
|
|
|
|
my $log = Log::Dispatch->new; |
67
|
|
|
|
|
|
|
|
68
|
|
|
|
|
|
|
$log->add(Log::Dispatch::Message::Passing->new( |
69
|
|
|
|
|
|
|
name => 'myapp_aggregate_log', |
70
|
|
|
|
|
|
|
min_level => 'debug', |
71
|
|
|
|
|
|
|
output => Message::Passing::Filter::Encode::JSON->new( |
72
|
|
|
|
|
|
|
output_to => Message::Passing::Output::ZeroMQ->new( |
73
|
|
|
|
|
|
|
connect => 'tcp://192.168.0.1:5558', |
74
|
|
|
|
|
|
|
) |
75
|
|
|
|
|
|
|
), |
76
|
|
|
|
|
|
|
)); |
77
|
|
|
|
|
|
|
|
78
|
|
|
|
|
|
|
$log->warn($_) for qw/ foo bar baz /; |
79
|
|
|
|
|
|
|
|
80
|
|
|
|
|
|
|
On your log aggregation server, just run the message-passing utility: |
81
|
|
|
|
|
|
|
|
82
|
|
|
|
|
|
|
message-passing --input ZeroMQ --input_options '{"socket_bind":"tcp://*:5222"}' \ |
83
|
|
|
|
|
|
|
--output File --output_options '{"filename":"/tmp/my_test.log"}' |
84
|
|
|
|
|
|
|
|
85
|
|
|
|
|
|
|
=head1 SOCKET TYPES |
86
|
|
|
|
|
|
|
|
87
|
|
|
|
|
|
|
ZeroMQ supports multiple socket types, the only ones used in Message::Passing::ZeroMQ are: |
88
|
|
|
|
|
|
|
|
89
|
|
|
|
|
|
|
=head2 PUB/SUB |
90
|
|
|
|
|
|
|
|
91
|
|
|
|
|
|
|
Used for general message distribution - you can have either multiple producers (PUB) |
92
|
|
|
|
|
|
|
which connect to one consumer (SUB), or multiple consumers (SUB) which connect to one |
93
|
|
|
|
|
|
|
producer (PUB). |
94
|
|
|
|
|
|
|
|
95
|
|
|
|
|
|
|
All consumers will get a copy of every message. |
96
|
|
|
|
|
|
|
|
97
|
|
|
|
|
|
|
In Message::Passing terms, L is for SUB sockets, and |
98
|
|
|
|
|
|
|
L is for PUB sockets. |
99
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
=head2 PUSH/PULL |
101
|
|
|
|
|
|
|
|
102
|
|
|
|
|
|
|
Used for message distribution. A sever (PUSH) distributes messages between |
103
|
|
|
|
|
|
|
a number of connecting clients (PULL) |
104
|
|
|
|
|
|
|
|
105
|
|
|
|
|
|
|
In Message::Passing terms, L is for PULL sockets, and |
106
|
|
|
|
|
|
|
L is for PUSH sockets. |
107
|
|
|
|
|
|
|
|
108
|
|
|
|
|
|
|
=head1 CONNECTION DIRECTION |
109
|
|
|
|
|
|
|
|
110
|
|
|
|
|
|
|
Note that in ZeroMQ, the connection direction and the direction of message flow can be |
111
|
|
|
|
|
|
|
entirely opposite. I.e. a client can connect to a server and send messages to it, or |
112
|
|
|
|
|
|
|
receive messages from it (depending on the direction of the socket types). |
113
|
|
|
|
|
|
|
|
114
|
|
|
|
|
|
|
=head1 CONNECTION ATTRIBUTES |
115
|
|
|
|
|
|
|
|
116
|
|
|
|
|
|
|
Both L and L support |
117
|
|
|
|
|
|
|
either binding a server or connecting to a remote host, due to the fact that ZeroMQ connections |
118
|
|
|
|
|
|
|
can be in any direction, as noted above. |
119
|
|
|
|
|
|
|
|
120
|
|
|
|
|
|
|
Therefore, each input or output should have one (but not both!) of the following attributes: |
121
|
|
|
|
|
|
|
|
122
|
|
|
|
|
|
|
=head2 connect |
123
|
|
|
|
|
|
|
|
124
|
|
|
|
|
|
|
Connects to a remote server, e.g. C<< tcp://192.168.0.1:5222 >> |
125
|
|
|
|
|
|
|
|
126
|
|
|
|
|
|
|
=head2 socket_bind |
127
|
|
|
|
|
|
|
|
128
|
|
|
|
|
|
|
Binds a server and waits for connections from clients, e.g. C<< tcp://*:5222 >> |
129
|
|
|
|
|
|
|
|
130
|
|
|
|
|
|
|
=head2 socket_type |
131
|
|
|
|
|
|
|
|
132
|
|
|
|
|
|
|
This defaults to C for L and C for |
133
|
|
|
|
|
|
|
L, however you can override it to C/C as |
134
|
|
|
|
|
|
|
appropriate for your use case if desired. |
135
|
|
|
|
|
|
|
|
136
|
|
|
|
|
|
|
=head1 MORE COMPLEX EXAMPLES |
137
|
|
|
|
|
|
|
|
138
|
|
|
|
|
|
|
With this in mind, we can easily create a system which aggregates messages from |
139
|
|
|
|
|
|
|
multiple publishers, and passes them out (in a round-robin fashion) to a pool of workers. |
140
|
|
|
|
|
|
|
|
141
|
|
|
|
|
|
|
# The message distributor: |
142
|
|
|
|
|
|
|
message-passing --input ZeroMQ --input_options '{"socket_bind":"tcp://*:5222"}' \ |
143
|
|
|
|
|
|
|
--output ZeroMQ --output_options '{"socket_bind":"tcp://*:5223","socket_type":"PUSH"}' |
144
|
|
|
|
|
|
|
|
145
|
|
|
|
|
|
|
# Workers |
146
|
|
|
|
|
|
|
{ |
147
|
|
|
|
|
|
|
package MyApp::MessageWorker; |
148
|
|
|
|
|
|
|
use Moo; |
149
|
|
|
|
|
|
|
|
150
|
|
|
|
|
|
|
with 'Message::Passing::Role::Filter'; |
151
|
|
|
|
|
|
|
|
152
|
|
|
|
|
|
|
sub filter { |
153
|
|
|
|
|
|
|
my ($self, $message) = @_; |
154
|
|
|
|
|
|
|
# .... process the message in any way you want here |
155
|
|
|
|
|
|
|
return undef; # Do not output the message.. |
156
|
|
|
|
|
|
|
} |
157
|
|
|
|
|
|
|
} |
158
|
|
|
|
|
|
|
|
159
|
|
|
|
|
|
|
message-passing --input ZeroMQ --input_options '{"connect":"tcp://127.0.0.1:5223","socket_type":"PULL"}' |
160
|
|
|
|
|
|
|
--filter '+MyApp::MessageWorker' |
161
|
|
|
|
|
|
|
--output STDOUT |
162
|
|
|
|
|
|
|
|
163
|
|
|
|
|
|
|
You log messages into the distributor as per the above simple example, and you can run multiple worker |
164
|
|
|
|
|
|
|
processes.. |
165
|
|
|
|
|
|
|
|
166
|
|
|
|
|
|
|
Less trivial setups could/would emit messages on error, or maybe re-emit the incoming message after transforming it |
167
|
|
|
|
|
|
|
in some way. |
168
|
|
|
|
|
|
|
|
169
|
|
|
|
|
|
|
=head1 SEE ALSO |
170
|
|
|
|
|
|
|
|
171
|
|
|
|
|
|
|
For more detailed information about ZeroMQ and how it works, please consult the ZeroMQ guide and the other links below: |
172
|
|
|
|
|
|
|
|
173
|
|
|
|
|
|
|
=over |
174
|
|
|
|
|
|
|
|
175
|
|
|
|
|
|
|
=item L |
176
|
|
|
|
|
|
|
|
177
|
|
|
|
|
|
|
=item L |
178
|
|
|
|
|
|
|
|
179
|
|
|
|
|
|
|
=item L |
180
|
|
|
|
|
|
|
|
181
|
|
|
|
|
|
|
=item L |
182
|
|
|
|
|
|
|
|
183
|
|
|
|
|
|
|
=item L |
184
|
|
|
|
|
|
|
|
185
|
|
|
|
|
|
|
=item L |
186
|
|
|
|
|
|
|
|
187
|
|
|
|
|
|
|
=back |
188
|
|
|
|
|
|
|
|
189
|
|
|
|
|
|
|
=head1 AUTHOR |
190
|
|
|
|
|
|
|
|
191
|
|
|
|
|
|
|
Tomas (t0m) Doran |
192
|
|
|
|
|
|
|
|
193
|
|
|
|
|
|
|
=head1 SPONSORSHIP |
194
|
|
|
|
|
|
|
|
195
|
|
|
|
|
|
|
This module exists due to the wonderful people at Suretec Systems Ltd. |
196
|
|
|
|
|
|
|
who sponsored its development for its |
197
|
|
|
|
|
|
|
VoIP division called SureVoIP for use with |
198
|
|
|
|
|
|
|
the SureVoIP API - |
199
|
|
|
|
|
|
|
|
200
|
|
|
|
|
|
|
|
201
|
|
|
|
|
|
|
=head1 COPYRIGHT |
202
|
|
|
|
|
|
|
|
203
|
|
|
|
|
|
|
Copyright Suretec Systems 2012. |
204
|
|
|
|
|
|
|
|
205
|
|
|
|
|
|
|
=head1 LICENSE |
206
|
|
|
|
|
|
|
|
207
|
|
|
|
|
|
|
GNU Affero General Public License, Version 3 |
208
|
|
|
|
|
|
|
|
209
|
|
|
|
|
|
|
If you feel this is too restrictive to be able to use this software, |
210
|
|
|
|
|
|
|
please talk to us as we'd be willing to consider re-licensing under |
211
|
|
|
|
|
|
|
less restrictive terms. |
212
|
|
|
|
|
|
|
|
213
|
|
|
|
|
|
|
=cut |
214
|
|
|
|
|
|
|
|
215
|
|
|
|
|
|
|
1; |
216
|
|
|
|
|
|
|
|