File Coverage

blib/lib/Coro/MP.pm
Criterion Covered Total %
statement 7 9 77.7
branch n/a
condition n/a
subroutine 3 3 100.0
pod n/a
total 10 12 83.3


line stmt bran cond sub pod time code
1             =head1 NAME
2              
3             Coro::MP - erlang-style multi-processing/message-passing framework
4              
5             =head1 SYNOPSIS
6              
7             use Coro::MP;
8              
9             # exports everything that AnyEvent::MP exports as well.
10             # new stuff compared to AnyEvent::MP:
11              
12             # creating/using ports from threads
13             my $port = port_async {
14             # thread context, $SELF is set to $port
15              
16             # returning will "kil" the $port with an empty reason
17             };
18              
19             # attach to an existing port
20             spawn $NODE, "::initfunc";
21             sub ::initfunc {
22             rcv_async $SELF, sub {
23             ...
24             };
25             }
26              
27             # simple "tag" receives:
28             my ($pid) = get "pid", 30
29             or die "no pid message received after 30s";
30              
31             # conditional receive
32             my ($tag, @data) = get_cond { $_[0] =~ /^group1_/ };
33             my @next_msg = get_cond { 1 } 30; # 30s timeout
34              
35             # run thread in port context
36             peval_async $port, {
37             die "kill the port\n";
38             };
39              
40             # synchronous "cal"
41             my @retval = syncol 30, $port, tag => $data;
42              
43             =head1 DESCRIPTION
44              
45             This module (-family) implements a simple message passing framework.
46              
47             Despite its simplicity, you can securely message other processes running
48             on the same or other hosts, and you can supervise entities remotely.
49              
50             This module depends heavily on L, in fact, many functions
51             exported by this module are identical to AnyEvent::MP functions. This
52             module family is simply the Coro API to AnyEvent::MP.
53              
54             Care has been taken to stay compatible with AnyEvent::MP, even if
55             sometimes this required a less natural API (C should indeed spawn a
56             thread, not just call an initfunc for example).
57              
58             For an introduction to AnyEvent::MP, see the L manual
59             page.
60              
61             =head1 VARIABLES/FUNCTIONS
62              
63             =over 4
64              
65             =cut
66              
67             package Coro::MP;
68              
69 1     1   677 use common::sense;
  1         2  
  1         7  
70              
71 1     1   47 use Carp ();
  1         1  
  1         19  
72              
73 1     1   441 use AnyEvent::MP::Kernel;
  0            
  0            
74             use AnyEvent::MP;
75             use Coro;
76             use Coro::AnyEvent ();
77              
78             use AE ();
79              
80             use base "Exporter";
81              
82             our $VERSION = "0.1";
83              
84             our @EXPORT = (@AnyEvent::MP::EXPORT, qw(
85             port_async rcv_async get get_cond syncal peval_async
86             ));
87             our @EXPORT_OK = (@AnyEvent::MP::EXPORT_OK);
88              
89             sub _new_coro {
90             my ($port, $threadcb) = @_;
91              
92             my $coro = async_pool {
93             eval { $threadcb->() };
94             kil $SELF, die => $@ if $@;
95             };
96             $coro->swap_sv (\$SELF, \$port);
97              
98             # killing the port cancels the coro
99             # delaying kil messages inside aemp guarantees
100             # (hopefully) that $coro != $Coro::current.
101             mon $port, sub { $coro->cancel (@_) };
102              
103             # cancelling the coro kills the port
104             $coro->on_destroy (sub { kil $port, @_ });
105              
106             $coro
107             }
108              
109             =item NODE, $NODE, node_of, configure
110              
111             =item $SELF, *SELF, SELF, %SELF, @SELF...
112              
113             =item snd, mon, kil, psub
114              
115             These variables and functions work exactly as in AnyEvent::MP, in fact,
116             they are exactly the same functions, and are used in much the same way.
117              
118             =item rcv
119              
120             This function works exactly as C, and is in fact
121             compatible with Coro::MP ports. However, the canonical way to receive
122             messages with Coro::MP is to use C or C.
123              
124             =item port
125              
126             This function is exactly the same as C and creates new
127             ports. You can attach a thread to them by calling C or you can
128             do a create and attach in one operation using C.
129              
130             =item peval
131              
132             This function works exactly as C - you could use it to
133             run callbacks within a port context (good for monitoring), but you cannot
134             C messages unless the callback executes within the thread attached to
135             the port.
136              
137             Since creating a thread with port context requires somewhta annoying
138             syntax, there is a C function that handles that for you - note
139             that within such a thread, you still cannot C messages.
140              
141             =item spawn
142              
143             This function is identical to C. This means that
144             it doesn't spawn a new thread as one would expect, but simply calls an
145             init function. The init function, however, can attach a new thread easily:
146              
147             sub initfun {
148             my (@args) = @_;
149              
150             rcv_async $SELF, sub {
151             # thread-code
152             };
153             }
154              
155             =item cal
156              
157             This function is identical to C. The easiest way to
158             make a synchronous call is to use Coro's rouse functionality:
159              
160             # send 1, 2, 3 to $port and wait up to 30s for reply
161             cal $port, 1, 2, 3, rouse_cb, 30;
162             my @reply = rouse_wait;
163              
164             You can also use C if you want, and are ok with learning yet
165             another function with a weird name:
166              
167             my @reply = syncal 30, $port, 1, 2, 3;
168              
169             =item $local_port = port_async { ... }
170              
171             Creates a new local port, and returns its ID. A new thread is created and
172             attached to the port (see C, below, for details).
173              
174             =cut
175              
176             sub rcv_async($$);
177              
178             sub port_async(;&) {
179             my $id = "$UNIQ." . $ID++;
180             my $port = "$NODE#$id";
181              
182             @_
183             ? rcv_async $port, shift
184             : AnyEvent::MP::rcv $port, undef;
185              
186             $port
187             }
188              
189             =item rcv_async $port, $threadcb
190              
191             This function creates and attaches a thread on a port. The thread is set
192             to execute C<$threadcb> and is put into the ready queue. The thread will
193             receive all messages not filtered away by tagged receive callbacks (as set
194             by C) - it simply replaces the default callback of an
195             AnyEvent::MP port.
196              
197             The special variable C<$SELF> will be set to C<$port> during thread
198             execution.
199              
200             When C<$threadcb> returns or the thread is canceled, the return/cancel
201             values become the C reason.
202              
203             It is not allowed to call C more than once on a given port.
204              
205             =cut
206              
207             sub rcv_async($$) {
208             my ($port, $threadcb) = @_;
209              
210             my (@queue, $coro);
211              
212             AnyEvent::MP::rcv $port, sub {
213             push @queue, \@_; # TODO, take copy?
214             $coro->ready; # TODO, maybe too many unwanted wake-ups?
215             };
216              
217             $coro = _new_coro $port, $threadcb;
218             $coro->{_coro_mp_queue} = \@queue;
219             }
220              
221             =item @msg = get $tag
222              
223             =item @msg = get $tag, $timeout
224              
225             Find, dequeue and return the next message with the specified C<$tag>. If
226             no matching message is currently queued, wait up to C<$timeout> seconds
227             (or forever if no C<$timeout> has been specified or it is C) for
228             one to arrive.
229              
230             Returns the message with the initial tag removed. In case of a timeout,
231             the empty list. The function I be called in list context.
232              
233             Note that empty messages cannot be distinguished from a timeout when using
234             C.
235              
236             Example: send a "log" message to C<$SELF> and then get and print it.
237              
238             snd $SELF, log => "text";
239             my ($text) = get "log";
240             print "log message: $text\n";
241              
242             Example: receive C and C messages, regardless of the order they
243             arrive in on the port.
244              
245             my @p1 = get "p1";
246             my @21 = get "p2";
247              
248             Example: assume a message with tag C is already in the queue and fetch
249             it. If no message was there, do not wait, but die.
250              
251             my @msg = get "now", 0
252             or die "expected now emssage to be there, but it wasn't";
253              
254             =cut
255              
256             sub get($;$) {
257             my ($tag, $timeout) = @_;
258              
259             my $queue = $Coro::current->{_coro_mp_queue}
260             or Carp::croak "Coro::MP::get called from thread not attached to any port";
261              
262             my $i;
263              
264             while () {
265             $queue->[$_][0] eq $tag
266             and return @{ splice @$queue, $_, 1 }
267             for $i..$#$queue;
268              
269             $i = @$queue;
270              
271             # wait for more messages
272             if (ref $timeout) {
273             schedule;
274             defined $i or return; # timeout
275              
276             } elsif (defined $timeout) {
277             $timeout or return;
278              
279             my $current = $Coro::current;
280             $timeout = AE::timer $timeout, 0, sub {
281             undef $i;
282             $current->ready;
283             };
284             } else {
285             $timeout = \$i; # dummy
286             }
287             }
288             }
289              
290             =item @msg = get_cond { condition... } [$timeout]
291              
292             Similarly to C, looks for a matching message. Unlike C,
293             "matching" is not defined by a tag alone, but by a predicate, a piece of
294             code that is executed on each candidate message in turn, with C<@_> set to
295             the message contents.
296              
297             The predicate code is supposed to return the empty list if the message
298             didn't match. If it returns anything else, then the message is removed
299             from the queue and returned to the caller.
300              
301             In addition, if the predicate returns a code reference, then it is
302             immediately called invoked on the removed message.
303              
304             If a C<$timeout> is specified and is not C, then, after this many
305             seconds have been passed without a matching message arriving, the empty
306             list will be returned.
307              
308             Example: fetch the next message, wait as long as necessary.
309              
310             my @msg = get_cond { 1 };
311              
312             Example: fetch the next message whose tag starts with C.
313              
314             my ($tag, @data) = get_cond { $_[0] =~ /^group1_/ };
315              
316             Example: check whether a message with tag C and a second
317             elemet of C<$pid> is in the queue already.
318              
319             if (
320             my (undef, $pid, $status) =
321             get_cond {
322             $_[0] eq "child_exit" && $_[1] == $pid
323             } 0
324             ) {
325             warn "child $pid did exit with status $status\n";
326             }
327              
328             Example: implement a server that reacts to C, C and C
329             messages, and exits after 30 seconds of idling.
330              
331             my $reverser = port_async {
332             while() {
333             get_cond {
334             $_[0] eq "exit" and return sub {
335             last; # yes, this is valid
336             };
337             $_[0] eq "log" and return sub {
338             print "log: $_[1]\n";
339             };
340             $_[0] eq "reverse" and return sub {
341             my (undef, $text, @reply) = @_;
342             snd @reply, scalar reverse $text;
343             };
344              
345             die "unexpected message $_[0] received";
346             } 30
347             or last;
348             }
349             };
350              
351             =cut
352              
353             sub _true { 1 }
354              
355             sub get_cond(;&$) {
356             my ($cond, $timeout) = @_;
357              
358             my $queue = $Coro::current->{_coro_mp_queue}
359             or Carp::croak "Coro::MP::get_cond called from thread not attached to any port";
360              
361             my ($i, $ok);
362              
363             $cond ||= \&_true;
364              
365             while () {
366             do
367             {
368             local *_ = $queue->[$_];
369             if ($ok = &$cond) {
370             splice @$queue, $_, 1;
371             &$ok if "CODE" eq ref $ok;
372             return @_;
373             }
374             }
375             for $i..$#$queue;
376              
377             $i = @$queue;
378              
379             # wait for more messages
380             if (ref $timeout) {
381             schedule;
382             defined $i or return; # timeout
383              
384             } elsif (defined $timeout) {
385             $timeout or return;
386              
387             my $current = $Coro::current;
388             $timeout = AE::timer $timeout, 0, sub {
389             undef $i;
390             $current->ready;
391             };
392             } else {
393             $timeout = \$i; # dummy
394             }
395             }
396             }
397              
398             =item $async = peval_async { BLOCK }
399              
400             Sometimes you want to run a thread within a port context, for error
401             handling.
402              
403             This function creates a new, ready, thread (using C), sets
404             C<$SELF> to the the current value of C<$SELF> while it executing, and
405             calls the given BLOCK.
406              
407             This is very similar to C - note that while the BLOCK exeuctes in
408             C<$SELF> port context, you cannot call C, as C<$SELF> can only be
409             attached to one thread.
410              
411             Example: execute some Coro::AIO code concurrently in another thread, but
412             make sure any errors C the originating port.
413              
414             port_async {
415             ...
416             peval_async {
417             # $SELF set, but cannot call get etc. here
418              
419             my $fh = aio_open ...
420             or die "open: $!";
421              
422             aio_close $fh;
423             };
424             };
425              
426             =cut
427              
428             sub peval_async($$) {
429             _new_coro $_[0], $_[1]
430             }
431              
432             =item @reply = syncal $port, @msg, $callback[, $timeout]
433              
434             The synchronous form of C, a simple form of RPC - it sends a message
435             to the given C<$port> with the given contents (C<@msg>), but adds a reply
436             port to the message.
437              
438             The reply port is created temporarily just for the purpose of receiving
439             the reply, and will be Ced when no longer needed.
440              
441             Then it will wait until a reply message arrives, which will be returned to
442             the caller.
443              
444             If the C<$timeout> is defined, then after this many seconds, when no
445             message has arrived, the port will be Ced and an empty list will be
446             returned.
447              
448             If the C<$timeout> is undef, then the local port will monitor the remote
449             port instead, so it eventually gets cleaned-up.
450              
451             Example: call the string reverse example from C.
452              
453             my $reversed = syncal 1, $reverse, reverse => "Rotator";
454              
455             =cut
456              
457             sub syncal($@) {
458             my ($timeout, @msg) = @_;
459              
460             cal @msg, Coro::rouse_cb, $timeout;
461             Coro::rouse_wait
462             }
463              
464             =back
465              
466             =head1 SEE ALSO
467              
468             L - a gentle introduction.
469              
470             L - like Coro::MP, but event-based.
471              
472             L.
473              
474             =head1 AUTHOR
475              
476             Marc Lehmann
477             http://home.schmorp.de/
478              
479             =cut
480              
481             1
482