File Coverage

blib/lib/Async/Selector.pm
Criterion Covered Total %
statement 126 127 99.2
branch 43 44 97.7
condition 3 3 100.0
subroutine 22 22 100.0
pod 9 13 69.2
total 203 209 97.1


line stmt bran cond sub pod time code
1             package Async::Selector;
2              
3 15     15   495041 use 5.006;
  15         58  
  15         596  
4 15     15   88 use strict;
  15         26  
  15         506  
5 15     15   81 use warnings;
  15         36  
  15         495  
6              
7 15     15   139 use Carp;
  15         39  
  15         2234  
8 15     15   17150 use Async::Selector::Watcher;
  15         161  
  15         29717  
9              
10              
11             =pod
12              
13             =head1 NAME
14              
15             Async::Selector - level-triggered resource observer like select(2)
16              
17              
18             =head1 VERSION
19              
20             1.03
21              
22             =cut
23              
24             our $VERSION = "1.03";
25              
26              
27             =pod
28              
29             =head1 SYNOPSIS
30              
31              
32             use Async::Selector;
33            
34             my $selector = Async::Selector->new();
35            
36             ## Register resource
37             my $resource = "some text."; ## 10 bytes
38            
39             $selector->register(resource_A => sub {
40             ## If length of $resource is more than or equal to $threshold bytes, provide it.
41             my $threshold = shift;
42             return length($resource) >= $threshold ? $resource : undef;
43             });
44            
45            
46             ## Watch the resource with a callback.
47             $selector->watch(
48             resource_A => 20, ## When the resource gets more than or equal to 20 bytes...
49             sub { ## ... execute this callback.
50             my ($watcher, %resource) = @_;
51             print "$resource{resource_A}\n";
52             $watcher->cancel();
53             }
54             );
55            
56            
57             ## Append data to the resource
58             $resource .= "data"; ## 14 bytes
59             $selector->trigger('resource_A'); ## Nothing happens
60            
61             $resource .= "more data"; ## 23 bytes
62             $selector->trigger('resource_A'); ## The callback prints 'some text.datamore data'
63              
64              
65             =head1 DESCRIPTION
66              
67             L is an object that watches registered resources
68             and executes callbacks when some of the resources are available.
69             Thus it is an implementation of the Observer pattern like L,
70             but the important difference is that L is B like C system call.
71              
72             Basic usage of L is as follows:
73              
74             =over
75              
76             =item 1.
77              
78             Register as many resources as you like by C method.
79              
80             A resource has its name and resource provider.
81             A resource provier is a subroutine reference that returns some data (or C if it's not available).
82              
83              
84             =item 2.
85              
86             Watch as many resources as you like by C method.
87              
88             When any of the watched resources gets available, a callback function is executed
89             with the available resource data.
90              
91             Note that if some of the watched resources is already available when calling C method,
92             it executes the callback function immediately.
93             That's because L is level-triggered.
94              
95              
96             =item 3.
97              
98             Notify the L object by C method that some of the registered resources have changed.
99              
100             The L object then checks if any of the triggered resources gets available.
101             If some resources become available, the callback function given by C method is executed.
102              
103              
104             =back
105              
106              
107             =head1 CLASS METHODS
108              
109              
110             =head2 $selector = Async::Selector->new();
111              
112             Creates an L object. It takes no parameters.
113              
114              
115             =cut
116              
117              
118             sub new {
119 68     68 1 48726 my ($class) = @_;
120 68         375 my $self = bless {
121             resources => {},
122             watchers => {},
123             }, $class;
124 68         220 return $self;
125             }
126              
127             sub _check {
128 552     552   1296 my ($self, $watcher_id_or_watcher, @triggers) = @_;
129 552         866 my %results = ();
130 552         720 my $fired = 0;
131 552         1451 my $watcher_entry = $self->{watchers}{"$watcher_id_or_watcher"};
132 552 50       1157 return 0 if not defined($watcher_entry);
133 552         718 my $watcher = $watcher_entry->{object};
134 552         1456 my %conditions = $watcher->conditions;
135 552 100       1659 if($watcher->get_check_all) {
136 96         232 @triggers = $watcher->resources;
137             }
138 552         1026 foreach my $res_key (@triggers) {
139 1100 100       2002 next if not defined $res_key;
140 1097 100       2156 next if not exists($conditions{$res_key});
141 889 100       2694 next if not defined($self->{resources}{$res_key});
142 764         984 my $input = $conditions{$res_key};
143 764         1988 my $result = $self->{resources}{$res_key}->($input);
144 764 100       4428 if(defined($result)) {
145 392         436 $fired = 1;
146 392         963 $results{$res_key} = $result;
147             }
148             }
149 552 100       1922 return 0 if !$fired;
150 287         848 $watcher_entry->{callback}->($watcher, %results);
151 287         20852 return 1;
152             }
153              
154             =pod
155              
156             =head1 OBJECT METHODS
157              
158             =head2 $selector->register($name => $provider->($condition_input), ...);
159              
160             Registers resources with the object.
161             A resource is described as a pair of resource name and resource provider.
162             You can register as many resources as you like.
163              
164             The resource name (C<$name>) is an arbitrary string.
165             It is used to select the resource in C method.
166             If C<$name> is already registered with C<$selector>,
167             the resource provider is updated with C<$provider> and the old one is discarded.
168              
169             The resource provider (C<$provider>) is a subroutine reference.
170             Its return value is supposed to be a scalar data of the resource if it's available,
171             or C if it's NOT available.
172              
173             C<$provider> subroutine takes a scalar argument (C<$condition_input>),
174             which is given by the user in arguments of C method.
175             C<$provider> can decide whether to provide the resource according to C<$condition_input>.
176              
177             C method returns C<$selector> object itself.
178              
179              
180             =cut
181              
182              
183             sub register {
184 60     60 1 22631 my ($self, %providers) = @_;
185 60         276 my @error_keys = ();
186 60         290 while(my ($key, $provider) = each(%providers)) {
187 112 100       260 if(!_isa_coderef($provider)) {
188 8         32 push(@error_keys, $key);
189             }
190             }
191 60 100       174 if(@error_keys) {
192 8         94 croak("Providers must be coderef for keys: " . join(",", @error_keys));
193 0         0 return;
194             }
195 52         139 @{$self->{resources}}{keys %providers} = values %providers;
  52         318  
196 52         242 return $self;
197             }
198              
199             =pod
200              
201             =head2 $selector->unregister($name, ...);
202              
203             Unregister resources from C<$selector> object.
204              
205             C<$name> is the name of the resource you want to unregister.
206             You can unregister as many resources as you like.
207              
208             C returns C<$selector> object itself.
209              
210             =cut
211              
212             sub unregister {
213 14     14 1 2714 my ($self, @names) = @_;
214 14         24 delete @{$self->{resources}}{grep { defined($_) } @names};
  14         60  
  28         66  
215 14         60 return $self;
216             }
217              
218              
219             =pod
220              
221             =head2 $watcher = $selector->watch($name => $condition_input, ..., $callback->($watcher, %resources));
222              
223             Starts to watch resources.
224             A watch is described as pairs of resource names and condition inputs for the resources.
225              
226             C<$name> is the resource name that you want to watch. It is the name given in C method.
227              
228             C<$condition_input> describes the condition the resource has to meet to be considered as "available".
229             C<$condition_input> is an arbitrary scalar, and its interpretation is up to the resource provider.
230              
231             You can list as many C<< $name => condition_input >> pairs as you like.
232              
233             C<$callback> is a subroutine reference that is executed when any of the watched resources gets available.
234             Its first argument C<$watcher> is an object of L which represents the watch you just made by C method.
235             This object is the same instance as the return value of C method.
236             The other argument (C<%resources>) is a hash whose keys are the available resource names and values are the corresponding resource data.
237             Note that C<$callback> is executed before C method returns
238             if some of the watched resources is already available.
239              
240             The return value of C<$callback> is just ignored by L.
241              
242             C method returns an object of L (C<$watcher>) which represents the watch you just made by C method.
243             C<$watcher> gives you various information such as the list of watched resources and whether the watcher is active or not.
244             See L for detail.
245              
246             The watcher created by C method is persistent in nature, i.e., it remains in the L object
247             and C<$callback> can be executed repeatedly. To cancel the watcher and release the C<$callback>,
248             call C<< $watcher->cancel() >> method.
249              
250             If no resource selection (C<< $name => $condition_input >> pair) is specified,
251             C method silently ignores it.
252             As a result, it returns a C<$watcher> object which is already canceled and inactive.
253              
254              
255             =head2 $watcher = $selector->watch_lt(...);
256              
257             C method is an alias for C method.
258              
259              
260             =head2 $watcher = $selector->watch_et(...);
261              
262             This method is just like C method but it emulates edge-triggered watch.
263              
264             To emulate edge-triggered behavior, C won't execute
265             the C<$callback> immediately even if some of the watched resources are available.
266             The C<$callback> is executed only when C method is called on
267             resources that are watched and available.
268              
269              
270             =cut
271              
272             sub _isa_coderef {
273 414     414   671 my ($coderef) = @_;
274 414   100     4016 return (defined($coderef) && defined(ref($coderef)) && ref($coderef) eq "CODE");
275             }
276              
277             sub watch_et {
278 246     246 1 1684 my $self = shift;
279 246         292 my (%conditions, $cb);
280 246         289 $cb = pop;
281 246 100       490 if(!_isa_coderef($cb)) {
282 6         82 croak "the watch callback must be a coderef.";
283             }
284 240         788 %conditions = @_;
285 240 100       1159 if(!%conditions) {
286 8         47 return Async::Selector::Watcher->new(
287             undef, \%conditions
288             );
289             }
290 232         1201 my $watcher = Async::Selector::Watcher->new(
291             $self, \%conditions
292             );
293 232         1324 $self->{watchers}{"$watcher"} = {
294             object => $watcher,
295             callback => $cb
296             };
297 232         614 return $watcher;
298             }
299              
300             sub watch_lt {
301 233     233 1 58196 my ($self, @args) = @_;
302 233         410 my $watcher;
303 233         883 $watcher = $self->watch_et(@args);
304 227 100       698 return $watcher if !$watcher->active;
305 221         748 $self->_check($watcher, $watcher->resources);
306 221         781 return $watcher;
307             }
308              
309             *watch = \&watch_lt;
310              
311             sub _wrapSelect {
312 56     56   221 my ($self, $method, $cb, %conditions) = @_;
313 56 100       176 if(!_isa_coderef($cb)) {
314 4         45 croak "the select callback must be a coderef.";
315             }
316             my $wrapped_cb = sub {
317 67     67   150 my ($w, %res) = @_;
318 67         175 foreach my $selected_resource ($w->resources) {
319 109 100       300 $res{$selected_resource} = undef if not exists($res{$selected_resource});
320             }
321 67 100       306 if($cb->("$w", %res)) {
322 27         4295 $w->cancel();
323             }
324 52         229 };
325 52         224 my $watcher = $self->$method(%conditions, $wrapped_cb);
326 52         177 $watcher->set_check_all(1);
327 52 100       139 return $watcher->active ? "$watcher" : undef;
328             }
329              
330             sub select_et {
331 5     5 0 1979 my ($self, @args) = @_;
332 5         20 return $self->_wrapSelect('watch_et', @args);
333             }
334              
335             sub select_lt {
336 51     51 0 13788 my ($self, @args) = @_;
337 51         151 return $self->_wrapSelect('watch_lt', @args);
338             }
339              
340             *select = \&select_lt;
341              
342             sub cancel {
343 182     182 0 17872 my ($self, @watchers) = @_;
344 182         378 foreach my $w (grep { defined($_) } @watchers) {
  194         665  
345 190 100       712 next if not exists $self->{watchers}{"$w"};
346 184         942 $self->{watchers}{"$w"}{object}->detach();
347 184         1444 delete $self->{watchers}{"$w"};
348             }
349 182         665 return $self;
350             }
351              
352             =pod
353              
354             =head2 $selector->trigger($name, ...);
355              
356             Notify C<$selector> that the resources specified by C<$name>s may be changed.
357              
358             C<$name> is the name of the resource that might have been changed.
359             You can specify as many C<$name>s as you like.
360              
361             Note that you may call C on resources that are not actually changed.
362             It is up to the resource provider to decide whether to provide the resource to watchers.
363              
364             C method returns C<$selector> object itself.
365              
366             =cut
367              
368             sub trigger {
369 342     342 1 279970 my ($self, @resources) = @_;
370 342 100       991 if(!@resources) {
371 3         12 return $self;
372             }
373 339         2018 foreach my $watcher ($self->watchers(@resources)) {
374 331         780 $self->_check($watcher, @resources);
375             }
376 339         1025 return $self;
377             }
378              
379             =pod
380              
381             =head2 @resouce_names = $selector->resources();
382              
383             Returns the list of registered resource names.
384              
385             =cut
386              
387             sub resources {
388 36     36 1 13809 my ($self) = @_;
389 36         51 return keys %{$self->{resources}};
  36         224  
390             }
391              
392             =pod
393              
394             =head2 $is_registered = $selector->registered($resource_name);
395              
396             Returns true if C<$resource_name> is registered with the L object.
397             Returns false otherwise.
398              
399             =cut
400              
401             sub registered {
402 28     28 1 13869 my ($self, $resource_name) = @_;
403 28 100       98 return 0 if not defined($resource_name);
404 26         182 return exists $self->{resources}{$resource_name};
405             }
406              
407              
408             =pod
409              
410              
411             =head2 @watchers = $selector->watchers([@resource_names]);
412              
413             Returns the list of active watchers (L objects) from the L object.
414              
415             If C method is called without argument, it returns all of the active watchers.
416              
417             If C method is called with some arguments (C<@resource_names>),
418             it returns active watchers that watch ANY resource out of C<@resource_names>.
419              
420             If you want watchers that watch ALL of C<@resource_names>,
421             try filtering the result (C<@watchers>) with L's C method.
422              
423             =cut
424              
425             sub watchers {
426 565     565 1 42792 my ($self, @resources) = @_;
427 565         744 my @all_watchers = map { $_->{object} } values %{$self->{watchers}};
  1125         2360  
  565         1500  
428 565 100       1863 if(!@resources) {
429 205         1075 return @all_watchers;
430             }
431 360         481 my @affected_watchers = ();
432 360         688 watcher_loop: foreach my $watcher (@all_watchers) {
433 838         2440 my %watch_conditions = $watcher->conditions;
434 838         1543 foreach my $res (@resources) {
435 1056 100       2274 next if !defined($res);
436 979 100       2512 if(exists($watch_conditions{$res})) {
437 503         623 push(@affected_watchers, $watcher);
438 503         1371 next watcher_loop;
439             }
440             }
441             }
442 360         1195 return @affected_watchers;
443             }
444              
445              
446             sub selections {
447 63     63 0 55866 my ($self) = @_;
448 63         247 return map { "$_" } $self->watchers;
  90         338  
449             }
450              
451              
452             =pod
453              
454             =head1 EXAMPLES
455              
456             =head2 Level-triggered vs. edge-triggered
457              
458             Watchers created by C and C methods are level-triggered.
459             This means their callbacks can be immediately executed if some of the watched resources
460             are already available.
461              
462             Watchers created by C method are edge-triggered.
463             This means their callbacks are never executed at the moment C is called.
464              
465             Both level-triggered and edge-triggered watcher callbacks are executed
466             when some of the watched resources are C-ed AND available.
467              
468              
469             my $selector = Async::Selector->new();
470             my $a = 10;
471             $selector->register(a => sub { my $t = shift; return $a >= $t ? $a : undef });
472              
473             ## Level-triggered watch
474             $selector->watch_lt(a => 5, sub { ## => LT: 10
475             my ($watcher, %res) = @_;
476             print "LT: $res{a}\n";
477             });
478             $selector->trigger('a'); ## => LT: 10
479             $a = 12;
480             $selector->trigger('a'); ## => LT: 12
481             $a = 3;
482             $selector->trigger('a'); ## Nothing happens because $a == 3 < 5.
483              
484             ## Edge-triggered watch
485             $selector->watch_et(a => 2, sub { ## Nothing happens because it's edge-triggered
486             my ($watcher, %res) = @_;
487             print "ET: $res{a}\n";
488             });
489             $selector->trigger('a'); ## => ET: 3
490             $a = 0;
491             $selector->trigger('a'); ## Nothing happens.
492             $a = 10;
493             $selector->trigger('a'); ## => LT: 10
494             ## => ET: 10
495              
496              
497              
498             =head2 Multiple resources, multiple watches
499              
500             You can register multiple resources with a single L
501             object. You can watch multiple resources with a single call of
502             C method. If you watch multiple resources, the callback is
503             executed when any of the watched resources is available.
504              
505              
506             my $selector = Async::Selector->new();
507             my $a = 5;
508             my $b = 6;
509             my $c = 7;
510             $selector->register(
511             a => sub { my $t = shift; return $a >= $t ? $a : undef },
512             b => sub { my $t = shift; return $b >= $t ? $b : undef },
513             c => sub { my $t = shift; return $c >= $t ? $c : undef },
514             );
515             $selector->watch(a => 10, sub {
516             my ($watcher, %res) = @_;
517             print "Select 1: a is $res{a}\n";
518             $watcher->cancel();
519             });
520             $selector->watch(
521             a => 12, b => 15, c => 15,
522             sub {
523             my ($watcher, %res) = @_;
524             foreach my $key (sort keys %res) {
525             print "Select 2: $key is $res{$key}\n";
526             }
527             $watcher->cancel();
528             }
529             );
530              
531             ($a, $b, $c) = (11, 14, 14);
532             $selector->trigger(qw(a b c)); ## -> Select 1: a is 11
533             print "---------\n";
534             ($a, $b, $c) = (12, 14, 20);
535             $selector->trigger(qw(a b c)); ## -> Select 2: a is 12
536             ## -> Select 2: c is 20
537              
538              
539             =head2 One-shot and persistent watches
540              
541             The watchers are persistent by default, that is, they remain in the
542             L object no matter how many times their callbacks
543             are executed.
544              
545             If you want to execute your callback just one time, call C<< $watcher->cancel() >>
546             in the callback.
547              
548              
549             my $selector = Async::Selector->new();
550             my $A = "";
551             my $B = "";
552             $selector->register(
553             A => sub { my $in = shift; return length($A) >= $in ? $A : undef },
554             B => sub { my $in = shift; return length($B) >= $in ? $B : undef },
555             );
556              
557             my $watcher_a = $selector->watch(A => 5, sub {
558             my ($watcher, %res) = @_;
559             print "A: $res{A}\n";
560             $watcher->cancel(); ## one-shot callback
561             });
562             my $watcher_b = $selector->watch(B => 5, sub {
563             my ($watcher, %res) = @_;
564             print "B: $res{B}\n";
565             ## persistent callback
566             });
567              
568             ## Trigger the resources.
569             ## Execution order of watcher callbacks is not guaranteed.
570             ($A, $B) = ('aaaaa', 'bbbbb');
571             $selector->trigger('A', 'B'); ## -> A: aaaaa
572             ## -> B: bbbbb
573             print "--------\n";
574             ## $watcher_a is already canceled.
575             ($A, $B) = ('AAAAA', 'BBBBB');
576             $selector->trigger('A', 'B'); ## -> B: BBBBB
577             print "--------\n";
578              
579             $B = "CCCCCCC";
580             $selector->trigger('A', 'B'); ## -> B: CCCCCCC
581             print "--------\n";
582              
583             $watcher_b->cancel();
584             $selector->trigger('A', 'B'); ## Nothing happens.
585              
586             =head2 Watcher aggregator
587              
588             Sometimes you might want to use multiple L objects
589             and watch their resources simultaneously.
590             In this case, you can use L to aggregate
591             watchers produced by L objects.
592             See L for details.
593              
594             my $selector_a = Async::Selector->new();
595             my $selector_b = Async::Selector->new();
596             my $A = "";
597             my $B = "";
598             $selector_a->register(resource => sub { my $in = shift; return length($A) >= $in ? $A : undef });
599             $selector_b->register(resource => sub { my $in = shift; return length($B) >= $in ? $B : undef });
600            
601             my $watcher_a = $selector_a->watch(resource => 5, sub {
602             my ($watcher, %res) = @_;
603             print "A: $res{resource}\n";
604             });
605             my $watcher_b = $selector_b->watch(resource => 5, sub {
606             my ($watcher, %res) = @_;
607             print "B: $res{resource}\n";
608             });
609            
610             ## Aggregates the two watchers into $aggregator
611             my $aggregator = Async::Selector::Aggregator->new();
612             $aggregator->add($watcher_a);
613             $aggregator->add($watcher_b);
614            
615             ## This cancels both $watcher_a and $watcher_b
616             $aggregator->cancel();
617            
618             print("watcher_a: " . ($watcher_a->active ? "active" : "inactive") . "\n"); ## -> watcher_a: inactive
619             print("watcher_b: " . ($watcher_b->active ? "active" : "inactive") . "\n"); ## -> watcher_b: inactive
620              
621              
622              
623             =head2 Real-time Web: Comet (long-polling) and WebSocket
624              
625             L can be used for foundation of so-called real-time
626             Web. Resource registered with an L object can be
627             pushed to Web browsers via Comet (long-polling) and/or WebSocket.
628              
629             See L for detail.
630              
631              
632             =head1 COMPATIBILITY
633              
634             The following methods that existed in L v0.02 or older are supported but not recommended
635             in this version.
636              
637             =over
638              
639             =item *
640              
641             C
642              
643             =item *
644              
645             C
646              
647             =item *
648              
649             C
650              
651             =item *
652              
653             C
654              
655             =item *
656              
657             C
658              
659             =back
660              
661             Currently the C methods are substituted for the C
662              
663             The differences between C and C
664              
665             =over
666              
667             =item *
668              
669             C methods take the watcher callback from the last argument, while C
670             take it from the first argument.
671              
672              
673             =item *
674              
675             C methods return L objects, while C
676             return selection IDs, which are strings.
677              
678             =item *
679              
680             The callback function for C receives L object from the
681             first argument, while the callback for C
682              
683             =item *
684              
685             The second argument for the callback function is also different.
686             For C methods, it is a hash of resources that are watched, triggered and available.
687             For C
688             for unavailable resources being C.
689              
690             =item *
691              
692             Return values from the callback function for C methods are ignored,
693             while those for C
694              
695              
696             =item *
697              
698             C method executes the callback for C methods when it triggers resources
699             that are watched and available.
700             On the other hand, C method executes the callback for C
701             resources that are watched, and some of the watched resources are available.
702             So if you trigger an unavailable watched resource and don't trigger any available watched resource,
703             the C
704              
705              
706              
707             =back
708              
709              
710             =head1 SEE ALSO
711              
712             L, L
713              
714              
715             =head1 AUTHOR
716              
717             Toshio Ito, C<< >>
718              
719             =head1 BUGS
720              
721             Please report any bugs or feature requests to C, or through
722             the web interface at L. I will be notified, and then you'll
723             automatically be notified of progress on your bug as I make changes.
724              
725              
726              
727             =head1 SUPPORT
728              
729             You can find documentation for this module with the perldoc command.
730              
731             perldoc Async::Selector
732              
733              
734             You can also look for information at:
735              
736             =over 4
737              
738             =item * RT: CPAN's request tracker (report bugs here)
739              
740             L
741              
742             =item * AnnoCPAN: Annotated CPAN documentation
743              
744             L
745              
746             =item * CPAN Ratings
747              
748             L
749              
750             =item * Search CPAN
751              
752             L
753              
754             =back
755              
756              
757              
758             =head1 LICENSE AND COPYRIGHT
759              
760             Copyright 2012-2013 Toshio Ito.
761              
762             This program is free software; you can redistribute it and/or modify it
763             under the terms of either: the GNU General Public License as published
764             by the Free Software Foundation; or the Artistic License.
765              
766             See http://dev.perl.org/licenses/ for more information.
767              
768              
769             =cut
770              
771             1; # End of Async::Selector