File Coverage

blib/lib/App/MultiModule/Core.pm
Criterion Covered Total %
statement 15 124 12.1
branch 0 40 0.0
condition 0 16 0.0
subroutine 5 21 23.8
pod 12 12 100.0
total 32 213 15.0


line stmt bran cond sub pod time code
1             package App::MultiModule::Core;
2             $App::MultiModule::Core::VERSION = '1.143160';
3 29     29   12181 use strict;use warnings;
  29     29   39  
  29         832  
  29         113  
  29         31  
  29         569  
4 29     29   99 use POE;
  29         33  
  29         141  
5 29     29   7912 use Storable;
  29         77  
  29         1212  
6 29     29   124 use IPC::Transit;
  29         31  
  29         39000  
7              
8             =head1 METHODS
9              
10             =cut
11             {
12             my $tags = {};
13              
14             sub _shutdown {
15             #http://poe.perl.org/?POE_FAQ/How_do_I_force_a_session_to_shut_down
16 0     0     my ($kernel, $session, $heap) = @_[KERNEL, SESSION, HEAP];
17 0           delete $heap->{wheel};
18 0 0         $kernel->alias_remove($heap->{alias}) if $heap->{alias};
19 0           $kernel->alarm_remove_all();
20 0           $kernel->refcount_decrement($session, 'my ref name');
21 0 0         $kernel->post($heap->{child_session}, 'shutdown') if $heap->{child_session};
22              
23 0           return;
24             }
25              
26             =head2 named_recur(%args)
27              
28             This is the preferred method to schedule recurring code in this framework.
29             Typically called from within set_config(), it automatically ensures
30             that only a single POE recurring event is setup, no matter how many times
31             named_recur() is called.
32              
33             Simply put: if your tasks has code that needs to run on an interval,
34             use this method to schedule it.
35              
36             The value in the 'recur_name' argument is used by this method to guard
37             against unwanted redundant scheduling of a code reference.
38              
39             That is, for all calls to named_recur inside a process space, there
40             will be one and only one scheduled event per unique value of the
41             argument 'recur_name'.
42              
43             This method takes all named arguments:
44              
45             =over 4
46              
47             =item recur_name (required) (process-globally unique string)
48              
49             Process global unique identifier for a recurring POE event.
50              
51             =item repeat_interval (required) (in seconds)
52              
53             How often the work should repeat.
54              
55             =item work (required) (CODE reference)
56              
57             The Perl code that is run on an interval
58              
59             =item tags (optional) (ARRAY reference of strings)
60              
61             The list of tags associated with this recurring work. These are referenced
62             by del_recurs() to deallocate scheduled POE events.
63              
64             =back
65              
66             Example: (copied from lib/MultiModuleTest/Example1.pm in this distribution)
67              
68             $self->named_recur(
69             recur_name => 'Example1',
70             repeat_interval => 1, #runs every second
71             work => sub {
72             my $message = {
73             ct => $self->{state}->{ct}++,
74             outstr => $config->{outstr},
75             };
76             $self->emit($message);
77             },
78             }
79              
80             =cut
81             sub named_recur {
82 0     0 1   my $self = shift;
83 0           my %args = @_;
84 0   0       my $recur_name = $args{recur_name} || 'none';
85 0 0         $App::MultiModule::Core::named_recur_times = {}
86             unless $App::MultiModule::Core::named_recur_times;
87 0           my $repeat_interval = $args{repeat_interval};
88 0 0 0       if( $self->{config} and
      0        
89             $self->{config}->{intervals} and
90             $self->{config}->{intervals}->{$args{recur_name}}) {
91 0           print STDERR "Setting repeat_interval for $args{recur_name} to " . $self->{config}->{intervals}->{$args{recur_name}} . " (default $repeat_interval)\n";
92 0           $repeat_interval = $self->{config}->{intervals}->{$args{recur_name}};
93             }
94 0 0         $App::MultiModule::Core::named_recur_times->{$args{recur_name}}
95             = $repeat_interval if $repeat_interval;
96 0 0         $self->{recurs} = {} unless $self->{recurs};
97 0 0         return 1 if $self->{recurs}->{$recur_name};
98 0           $self->{recurs}->{$recur_name} = 1;
99 0           return recur($self, %args);
100             }
101              
102             =head2 del_recurs($tag)
103              
104             Call this method to deallocate all of the previously scheduled POE
105             events that contain the passed $tag.
106              
107             NOTE NOTE NOTE
108              
109             Because of the way POE scheduling works, it is possible and likely that
110             a scheduled task could run one additional time AFTER del_recurs is called
111             on it.
112              
113             Example:
114             $self->del_recurs('some_tag');
115              
116             =cut
117             sub del_recurs {
118 0     0 1   my $self = shift;
119 0           my $tag = shift;
120 0           my %args = @_;
121 0 0         return unless $tags->{$tag};
122 0           foreach my $session_id (keys %{$tags->{$tag}}) {
  0            
123 0           POE::Kernel->post($session_id, 'shutdown', $session_id);
124             }
125 0           delete $tags->{$tag};
126             }
127              
128             =head2 get_tags
129              
130             Return an array reference of all of the tags that have been assigned to
131             all of the currently scheduled POE events.
132              
133             See NOTE in del_recurs(): a call to get_tags() immediately after a call
134             to del_recurs() will NOT show the deleted tag, but it is possible
135             that one or more delete scheduled events will run one additional time.
136              
137             Example:
138             foreach my $tag (@{$self->get_tags()) {
139              
140             }
141              
142             =cut
143             sub get_tags {
144 0     0 1   return $tags;
145             }
146              
147             =head2 recur(%args)
148              
149             It is probably best to call named_recur().
150              
151             This method actually does all of the scheduling work, and is called
152             from named_recur(). However, named_recur() does the global named
153             uniqueness check, and this method does not. So if you call this method
154             directly, especially in set_config(), take care to not allow a build-up
155             of POE events.
156              
157             All of this method's arguments are the same as named_recur(), except
158             it does not consider the recur_name field.
159              
160             =cut
161             sub recur {
162 0     0 1   my $self = shift;
163 0           my %args = @_;
164              
165 0 0         $args{repeat_interval} = 300 unless $args{repeat_interval};
166 0 0   0     $args{work} = sub { print "Somebody forgot to pass work\n"; } unless $args{work};
  0            
167             $self->add_session(
168             { inline_states => {
169             _start => sub {
170 0     0     $_[HEAP]{alarm_id} = $_[KERNEL]->alarm_set(
171             party => time() + 1
172             );
173 0           $_[KERNEL]->delay(tick => 1);
174             },
175             tick => sub {
176 0     0     my $repeat_interval = $args{repeat_interval};
177 0 0         if($args{recur_name}) {
    0          
178 0           $repeat_interval = $App::MultiModule::Core::named_recur_times->{$args{recur_name}};
179             } elsif($args{override_repeat_interval}) {
180 0           my $r;
181 0           eval {
182 0           $r = $args{override_repeat_interval}->();
183             };
184 0 0         $repeat_interval = $r if $r;
185             # print STDERR "\$repeat_interval=$repeat_interval\n" if $r;
186             }
187 0           $_[KERNEL]->delay(tick => $repeat_interval);
188 0           &{$args{work}}(@_);
  0            
189             },
190             },
191             },
192 0           %args,
193             );
194             }
195              
196             =head2 add_session($session_def)
197              
198             =cut
199             sub add_session {
200 0     0 1   my $self = shift;
201 0           my $session_def = shift;
202 0           my %args = @_;
203 0   0       my $my_tags = $args{tags} || [$self->{task_name}];
204 0 0 0       die 'App::MultiModule::Core::add_sesion: passed argument "tags" must be an ARRAY reference'
205             if not ref $my_tags or ref $my_tags ne 'ARRAY';
206 0           push @{$my_tags}, $self->{task_name}
  0            
207 0 0         unless grep { /^$self->{task_name}$/ } @$my_tags;
208 0           $session_def->{inline_states}->{'shutdown'} = \&_shutdown;
209 0           my $session_id = POE::Session->create(%$session_def);
210 0           foreach my $tag (@{$my_tags}) {
  0            
211 0 0         $tags->{$tag} = {} unless $tags->{$tag};
212 0           $tags->{$tag}->{$session_id} = 1;
213             }
214             }
215             }
216              
217             {
218             my $get_info = sub {
219             my $file = shift;
220             my $has_message_method = 0;
221             my $has_set_config_method = 0;
222             my $is_stateful = 0;
223             eval {
224             open my $fh, '<', $file or die "failed to open $file: $!";
225             while(my $line = <$fh>) {
226             $has_message_method = 1 if $line =~ /^sub message/;
227             $has_set_config_method = 1 if $line =~ /^sub set_config/;
228             $is_stateful = 1 if $line =~ /^sub is_stateful/;
229             }
230             close $fh or die "failed to close $file: $!";
231             };
232             die "get_info: $@\n" if $@;
233             my $is_multimodule = $has_message_method;
234             return {
235             is_stateful => $is_stateful,
236             is_multimodule => $is_multimodule,
237             };
238             };
239              
240             =head2 get_multimodules_info
241              
242             This returns a hash reference that contains information about every
243             task that the MultiModule framework is aware of. 'aware of' is not
244             limited to running and/or loaded. A MultiModule task module that
245             exists in the configured search path, even though it is not referenced
246             or configured, will also be in this structure.
247              
248             The key to the return hash is the task name. The value is a reference
249             to a hash that contains a variety of fields:
250              
251             =over 4
252              
253             =item is_multimodule
254              
255             Always true at this point; this is a legacy field that will be removed
256              
257             =item is_stateful
258              
259             Has a true value if the referenced task is stateful.
260              
261             =item config
262              
263             Contains undef if there is no config currently available for the task.
264             Otherwise, this field contains the config for the task.
265              
266             =back
267              
268             NOTE NOTE NOTE
269              
270             At this time, calling this method from a task object will fail. It can
271             only be called from the 'root', MultiModule object.
272              
273             Example:
274             while(my($task_name, $task_info) =
275             each %{$root_object->get_multimodules_info}) {
276              
277             }
278              
279             =cut
280             sub get_multimodules_info {
281 0     0 1   my $self = shift;
282 0           my %args = @_;
283 0           my $module_prefixes = Storable::dclone($self->{module_prefixes});
284 0           my $hits = {};
285 0           foreach my $inc (@INC) {
286 0           foreach my $prefix (@$module_prefixes) {
287 0           $prefix =~ s/::/\//g;
288 0           my $path = "$inc/$prefix";
289 0           eval { #ignore everything...
290 0 0         die unless -d $path;
291 0 0         opendir(my $dh, $path) or die "can't opendir $path: $!\n";
292 0 0 0       foreach my $file (grep { not /^\./ and -f "$path/$_" and /\.pm$/ } readdir($dh)) {
  0            
293 0           my $info = $get_info->("$path/$file");
294 0           $file =~ s/\.pm$//;
295 0 0         if($info->{is_multimodule}) {
296 0           eval {
297 0           $info->{config} =
298             $self->{api}->get_task_config($file);
299             };
300 0           $hits->{$file} = $info;
301             }
302             }
303 0           closedir $dh;
304             }; #...really. Does that make me a terrible person?
305             }
306             }
307 0           return $hits;
308             }
309             }
310              
311             =head2 bucket($message)
312              
313             This method is called to send data to the monitoring/management subsystem
314             of MultiModule.
315              
316             =over 4
317              
318             =item task_name
319              
320             =item check_type
321              
322             =item cutoff_age
323              
324             =item min_points
325              
326             =item min_bucket_span
327              
328             =item bucket_name
329              
330             =item bucket_metric
331              
332             =item bucket_type
333              
334             =item value
335              
336             =back
337              
338             =cut
339             sub bucket {
340 0     0 1   my $self = shift;
341 0           my $message = shift;
342 0           $message->{is_bucket} = 1;
343 0           my %args = @_;
344 0           IPC::Transit::send(
345             qname => 'MultiModule',
346             message => $message,
347             override_local => 1
348             );
349             }
350              
351             =head1 OUT OF BAND METHODS
352              
353             The following methods are all a standardized interface to the
354             Out Of Band subsystem, which is fully documented in
355             perldoc App::MultiModule::Tasks::OutOfband
356              
357             For all of the following methods (except send_oob()), the first,
358             required argument is meant to be a relatively short, human readable
359             'summary', as appropriate. Key/value pairs can optionally be passed in as
360             well, which are optionally accessible for viewing and/or filtering.
361              
362             =head2 log($logstr, %optional_extra_info)
363              
364             This method sends some information to the logging subsystem.
365              
366             Example:
367             $self->log('Something boring and relatively rare.', something => $else);
368             =cut
369             sub log {
370 0     0 1   my $self = shift;
371 0           my $str = shift;
372 0           my %args = @_;
373 0           $self->send_oob('log', {
374             args => \%args,
375             str => $str,
376             pid => $$,
377             });
378             }
379              
380             =head2 debug($debugstr, %optional_extra_info)
381              
382             This method sends some information to the debugging subsystem.
383              
384             Example:
385             $self->debug("This $thing might be of interest", something => $else)
386             if $self->{debug} > 2;
387              
388             =cut
389             sub debug {
390 0     0 1   my $self = shift;
391 0           my $str = shift;
392 0           my %args = @_;
393 0           $self->send_oob('debug', {
394             args => \%args,
395             str => $str,
396             pid => $$,
397             });
398             }
399              
400             =head2 alert($lalertstr, %optional_extra_info)
401              
402             This method sends some information to the alerting subsystem.
403              
404             An alert() should always be 'actionable'. This is used by the
405             MultiModule internal monitoring infrastructure to communicate
406             resource violations, and when tasks are shutdown and failsafed.
407              
408             Example:
409             $self->alert("This $thing needs immediate attention", also => $this);
410             =cut
411             sub alert {
412 0     0 1   my $self = shift;
413 0           my $str = shift;
414 0           my %args = @_;
415 0           $self->send_oob('alert', {
416             args => \%args,
417             str => $str,
418             pid => $$,
419             });
420             }
421              
422             =head2 error($errorstr, %optional_extra_info)
423              
424             This method sends some information to the error subsystem.
425              
426             An error() should always be relevant, but it does not have to
427             be 'actionable'. MultiModule sends these, for example, if a
428             referenced task has a compile error or a run-time exception.
429              
430             Example:
431             $self->error('Something reasonably bad happened.', also => $this);
432             =cut
433             sub error {
434 0     0 1   my $self = shift;
435 0           my $str = shift;
436 0           my %args = @_;
437 0           $self->send_oob('error', {
438             args => \%args,
439             str => $str,
440             pid => $$,
441             });
442             }
443              
444             =head2 send_oob($oob_type, $oob_message)
445              
446             This is the method that log, error, debug and alert all call.
447             There can be any number of Out Of Band types. Think of each
448             'type' as a separate channel for messages that come out of MultiModule.
449             These channels are configurably handled. As mentioned before, see
450             perldoc App::MultiModule::Tasks::OutOfband for full documentation.
451              
452             =over 4
453              
454             =item $oob_type A string that defines the OOB channel
455              
456             =item $oob_message A HASH reference that is sent through the OOB channel
457              
458             =back
459              
460             =cut
461             #The idea is that all OOB calls are sent locally to
462             #the OutOfBand task.
463              
464             #That task, if running externally, will re-send the message, non-
465             #locally, to the same queue, which will be picked up by the main
466             #OutOfBand task, which actually does the dirty work of
467             #handling the OOB message stream, but in a central process space.
468             sub send_oob {
469 0     0 1   my $self = shift;
470 0           my $type = shift;
471 0           my $message = shift;
472 0           $message->{type} = $type;
473 0           IPC::Transit::local_queue(qname => 'OutOfBand');
474 0           IPC::Transit::send(qname => 'OutOfBand', message => $message);
475             }
476              
477             1;