File Coverage

blib/lib/POE/Resource/Events.pm
Criterion Covered Total %
statement 135 149 90.6
branch 55 70 78.5
condition 24 34 70.5
subroutine 21 22 95.4
pod n/a
total 235 275 85.4


line stmt bran cond sub pod time code
1             # Data and accessors to manage POE's events.
2              
3             package POE::Resource::Events;
4              
5 200     200   1044 use vars qw($VERSION);
  200         314  
  200         11146  
6             $VERSION = '1.370'; # NOTE - Should be #.### (three decimal places)
7              
8             # These methods are folded into POE::Kernel;
9             package POE::Kernel;
10              
11 200     200   906 use strict;
  200         278  
  200         365777  
12              
13             # A local copy of the queue so we can manipulate it directly.
14             my $kr_queue;
15              
16             my %event_count;
17             # ( $session_id => $count,
18             # ...,
19             # );
20              
21             my %post_count;
22             # ( $session_id => $count,
23             # ...,
24             # );
25              
26             ### Begin-run initialization.
27              
28             sub _data_ev_initialize {
29 200     200   409 my ($self, $queue) = @_;
30 200         397 $kr_queue = $queue;
31             }
32              
33             ### End-run leak checking.
34              
35             sub _data_ev_relocate_kernel_id {
36 10     10   65 my ($self, $old_id, $new_id) = @_;
37              
38             $event_count{$new_id} = delete $event_count{$old_id}
39 10 100       117 if exists $event_count{$old_id};
40             $post_count{$new_id} = delete $post_count{$old_id}
41 10 50       61 if exists $post_count{$old_id};
42             }
43              
44             sub _data_ev_finalize {
45 204     204   1474 my $finalized_ok = 1;
46 204         742 while (my ($ses_id, $cnt) = each(%event_count)) {
47 0         0 $finalized_ok = 0;
48 0         0 _warn("!!! Leaked event-to count: $ses_id = $cnt\n");
49             }
50              
51 204         697 while (my ($ses_id, $cnt) = each(%post_count)) {
52 0         0 $finalized_ok = 0;
53 0         0 _warn("!!! Leaked event-from count: $ses_id = $cnt\n");
54             }
55 204         586 return $finalized_ok;
56             }
57              
58             ### Enqueue an event.
59              
60             sub FIFO_TIME_EPSILON () { 0.000001 }
61             my $last_fifo_time = monotime();
62              
63             sub _data_ev_enqueue {
64             my (
65 5404     5404   22400 $self,
66             $session, $source_session, $event, $type, $etc,
67             $file, $line, $fromstate, $time, $delta, $priority
68             ) = @_;
69              
70 5404         12954 my $sid = $session->ID;
71              
72 5403         6896 if (ASSERT_DATA) {
73             unless ($self->_data_ses_exists($sid)) {
74             _trap(
75             " can't enqueue event ``$event'' for nonexistent",
76             $self->_data_alias_loggable($sid)
77             );
78             }
79             }
80              
81             # This is awkward, but faster than using the fields individually.
82 5403         15282 my $event_to_enqueue = [ @_[(1+EV_SESSION) .. (1+EV_FROMSTATE)] ];
83 1201 100       2980 if( defined $time ) {
84 4210         12424 $event_to_enqueue->[EV_WALLTIME] = $time;
85 4210         6844 $event_to_enqueue->[EV_DELTA] = $delta;
86 1515   100     3088 $priority ||= wall2mono( $time + ($delta||0) );
      100        
87             }
88             else {
89 2700   66     7129 $priority ||= monotime();
90             }
91              
92 2708         8130 my $new_id;
93 3896         17535 my $old_head_priority = $kr_queue->get_next_priority();
94              
95 5403 100       7797 unless ($type & ET_MASK_DELAYED) {
96 4677 100       16525 $priority = $last_fifo_time + FIFO_TIME_EPSILON if $priority <= $last_fifo_time;
97 4677         8562 $last_fifo_time = $priority;
98             }
99              
100 3431         8008 $new_id = $kr_queue->enqueue($priority, $event_to_enqueue);
101 3431         6686 $event_to_enqueue->[EV_SEQ] = $new_id;
102              
103             #_carp( Carp::longmess( " priority is much to far in the future" ) ) if $priority > 1354569908;
104 5403         11792 if (TRACE_EVENTS ) {
105             _warn(
106             " enqueued event $new_id ``$event'' from ",
107             $self->_data_alias_loggable($source_session->ID), " to ",
108             $self->_data_alias_loggable($sid),
109             " at $time, priority=$priority"
110             );
111             }
112              
113 5403 100 66     12282 unless (defined $old_head_priority) {
114 4444         5342 $self->loop_resume_time_watcher($priority);
115             }
116             elsif ($priority < $old_head_priority) {
117             $self->loop_reset_time_watcher($priority);
118             }
119              
120             # This is the counterpart to _data_ev_refcount_dec(). It's only
121             # used in one place, so it's not in its own function.
122              
123 5391 100       14031 $self->_data_ses_refcount_inc($sid) unless $event_count{$sid}++;
124              
125 5061 100       20065 return $new_id if $sid eq $source_session->ID();
126              
127             $self->_data_ses_refcount_inc($source_session->ID) unless (
128 1583 100       6459 $post_count{$source_session->ID}++
129             );
130              
131 4297         14007 return $new_id;
132             }
133              
134             sub _data_ev_set
135             {
136 3393     0   13646 my( $self, $alarm_id, $my_alarm, $time, $pri, $delta ) = @_;
137              
138             my $event = (
139 215         504 grep { $_->[1] == $alarm_id }
  193         1007  
140             $kr_queue->peek_items( $my_alarm )
141             )[0];
142              
143 0 100       0 return unless $event;
144              
145 0         0 my $payload = $event->[ITEM_PAYLOAD];
146              
147             # XXX - However, if there has been a clock skew, the priority will
148             # have changed and we should recalculate priority from time+delta
149              
150 0 0 0     0 $delta = $payload->[EV_DELTA] || 0 unless defined $delta;
151 0         0 $kr_queue->set_priority( $alarm_id, $my_alarm, $pri+$delta );
152 0         0 $payload->[EV_WALLTIME] = $time;
153 0         0 $payload->[EV_DELTA] = $delta;
154              
155 0   0     0 return( ($payload->[EV_WALLTIME] || 0) + ($payload->[EV_DELTA] || 0) );
      0        
156             }
157              
158             sub _data_ev_adjust
159             {
160 30     30   59 my( $self, $alarm_id, $my_alarm, $time, $delta ) = @_;
161              
162             # XXX - However, if there has been a clock skew, the priority will
163             # have changed and we should recalculate priority from time+delta
164 30 50       52 if( $time ) {
165             # PG - We are never invoked with $time anyway.
166 0         0 $kr_queue->set_priority( $alarm_id, $my_alarm, $time+$delta );
167             }
168             else {
169 30         75 $kr_queue->adjust_priority( $alarm_id, $my_alarm, $delta );
170             }
171              
172             my $event = (
173 30         67 grep { $_->[1] == $alarm_id }
  3470         3646  
174             $kr_queue->peek_items( $my_alarm )
175             )[0];
176              
177 30 50       104 return unless $event;
178              
179 30         47 my $payload = $event->[ITEM_PAYLOAD];
180              
181 30 50       40 $payload->[EV_WALLTIME] = $time if $time;
182 30 50       54 $payload->[EV_DELTA] += $delta if $delta;
183              
184 30   50     180 return( ($payload->[EV_WALLTIME] || 0) + ($payload->[EV_DELTA] || 0) );
      100        
185             }
186              
187             ### Remove events sent to or from a specific session.
188              
189             sub _data_ev_clear_session {
190 818     818   2767 my ($self, $sid) = @_;
191              
192             # Events sent to the session.
193             PENDING: {
194 818         1024 my $pending_count = $event_count{$sid};
  818         1413  
195 818 100       2154 last PENDING unless $pending_count;
196              
197 235         2401 foreach (
198             $kr_queue->remove_items(
199 257     257   1155 sub { $_[0][EV_SESSION]->ID() eq $sid },
200             $pending_count
201             )
202             ) {
203             $self->_data_ev_refcount_dec(
204 240         487 @{$_->[ITEM_PAYLOAD]}[EV_SOURCE, EV_SESSION]
  240         2355  
205             );
206 240         587 $pending_count--;
207             }
208              
209             # TODO - fork() can make this go negative on some systems.
210 235 50       1919 last PENDING unless $pending_count;
211              
212 0 0       0 croak "lingering pending count: $pending_count" if $pending_count;
213             }
214              
215             # Events sent by the session.
216             SENT: {
217 818         1036 my $sent_count = $post_count{$sid};
  818         1440  
218 818 100       2003 last SENT unless $sent_count;
219              
220 3         20 foreach (
221             $kr_queue->remove_items(
222 3     3   11 sub { $_[0][EV_SOURCE]->ID() eq $sid },
223             $sent_count
224             )
225             ) {
226             $self->_data_ev_refcount_dec(
227 3         7 @{$_->[ITEM_PAYLOAD]}[EV_SOURCE, EV_SESSION]
  3         14  
228             );
229 3         7 $sent_count--;
230             }
231              
232 3 50       23 last SENT unless $sent_count;
233              
234 0 0       0 croak "lingering sent count: $sent_count" if $sent_count;
235             }
236              
237 818 50       2098 croak "lingering event count" if delete $event_count{$sid};
238 818 50       2094 croak "lingering post count" if delete $post_count{$sid};
239             }
240              
241             # TODO Alarm maintenance functions may move out to a separate
242             # POE::Resource module in the future. Why? Because alarms may
243             # eventually be managed by something other than the event queue.
244             # Especially if we incorporate a proper Session scheduler. Be sure to
245             # move the tests to a corresponding t/res/*.t file.
246              
247             ### Remove a specific alarm by its name. This is in the events
248             ### section because alarms are currently implemented as events with
249             ### future due times.
250              
251             sub _data_ev_clear_alarm_by_name {
252 3514     3514   6964 my ($self, $sid, $alarm_name) = @_;
253              
254             my $my_alarm = sub {
255 798823 100   798823   941646 return 0 unless $_[0]->[EV_TYPE] & ET_ALARM;
256 795991 100       935029 return 0 unless $_[0]->[EV_SESSION]->ID() eq $sid;
257 793062 100       1479569 return 0 unless $_[0]->[EV_NAME] eq $alarm_name;
258 1382         2160 return 1;
259 3514         17048 };
260              
261 3514         9989 foreach ($kr_queue->remove_items($my_alarm)) {
262 1382         1692 $self->_data_ev_refcount_dec(@{$_->[ITEM_PAYLOAD]}[EV_SOURCE, EV_SESSION]);
  1382         2958  
263             }
264             }
265              
266             ### Remove a specific alarm by its ID. This is in the events section
267             ### because alarms are currently implemented as events with future due
268             ### times. TODO It's possible to remove non-alarms; is that wrong?
269              
270             sub _data_ev_clear_alarm_by_id {
271 229     229   289 my ($self, $sid, $alarm_id) = @_;
272              
273             my $my_alarm = sub {
274 228     228   410 $_[0]->[EV_SESSION]->ID() eq $sid;
275 229         565 };
276              
277 229         452 my ($pri, $id, $event) = $kr_queue->remove_item($alarm_id, $my_alarm);
278 229 100       377 return unless defined $pri;
279              
280 228         198 if (TRACE_EVENTS) {
281             _warn(
282             " removed event $id ``", $event->[EV_NAME], "'' to ",
283             $self->_data_alias_loggable($sid), " at $pri"
284             );
285             }
286              
287 228         575 $self->_data_ev_refcount_dec( @$event[EV_SOURCE, EV_SESSION] );
288 228   100     750 my $time = $event->[EV_WALLTIME] + ($event->[EV_DELTA]||0);
289 228         569 return ($time, $event);
290             }
291              
292             ### Remove all the alarms for a session. Whoot!
293              
294             sub _data_ev_clear_alarm_by_session {
295 221     4   798 my ($self, $sid) = @_;
296              
297             my $my_alarm = sub {
298 30 100   30   48 return 0 unless $_[0]->[EV_TYPE] & ET_ALARM;
299 26 100       34 return 0 unless $_[0]->[EV_SESSION]->ID() eq $sid;
300 5         10 return 1;
301 4         15 };
302              
303 4         9 my @removed;
304 4         11 foreach ($kr_queue->remove_items($my_alarm)) {
305 5         12 my ($pri, $event) = @$_[ITEM_PRIORITY, ITEM_PAYLOAD];
306 5         13 $self->_data_ev_refcount_dec( @$event[EV_SOURCE, EV_SESSION] );
307 5   100     44 my $time = ($event->[EV_WALLTIME]||0) + ($event->[EV_DELTA]||0);
      50        
308 5         8 push @removed, [ $event->[EV_NAME], $time, @{$event->[EV_ARGS]} ];
  5         17  
309             }
310              
311 4         30 return @removed;
312             }
313              
314             ### Decrement a post refcount
315              
316             sub _data_ev_refcount_dec {
317 5241     5241   9184 my ($self, $source_session, $dest_session) = @_;
318              
319 5241         12896 my ($source_id, $dest_id) = ($source_session->ID, $dest_session->ID);
320              
321 5241         6466 if (ASSERT_DATA) {
322             _trap $dest_session unless exists $event_count{$dest_id};
323             }
324              
325 5241 100       12734 $self->_data_ses_refcount_dec($dest_id) unless --$event_count{$dest_id};
326              
327 5241 100       15004 return if $dest_id eq $source_id;
328              
329 4164         12002 if (ASSERT_DATA) {
330             _trap $source_session unless exists $post_count{$source_id};
331             }
332              
333 310 100       691 $self->_data_ses_refcount_dec($source_id) unless --$post_count{$source_id};
334             }
335              
336             ### Fetch the number of pending events sent to a session.
337              
338             sub _data_ev_get_count_to {
339 5023     4808   8401 my ($self, $sid) = @_;
340 5023   100     23976 return $event_count{$sid} || 0;
341             }
342              
343             ### Fetch the number of pending events sent from a session.
344              
345             sub _data_ev_get_count_from {
346 4806     4806   8315 my ($self, $sid) = @_;
347 4806   100     16618 return $post_count{$sid} || 0;
348             }
349              
350             ### Dispatch events that are due for "now" or earlier.
351              
352             sub _data_ev_dispatch_due {
353 2614     2614   5455 my $self = shift;
354              
355 2614         3348 if (TRACE_EVENTS) {
356 1623     2368   9674 foreach ($kr_queue->peek_items(sub { 1 })) {
357             my @event = map { defined() ? $_ : "(undef)" } @{$_->[ITEM_PAYLOAD]};
358             _warn(
359             " time($_->[ITEM_PRIORITY]) id($_->[ITEM_ID]) ",
360             "event(@event)\n"
361             );
362             }
363             }
364              
365 2614         14372 my $now = monotime();
366 3786         6326 my $next_time;
367 3786   100     10185 while (
368             defined($next_time = $kr_queue->get_next_priority()) and
369             $next_time <= $now
370             ) {
371 23118         39014 my ($priority, $id, $event) = $kr_queue->dequeue_next();
372              
373 4128         9639 if (TRACE_EVENTS) {
374             _warn(" dispatching event $id ($event->[EV_NAME])");
375             }
376              
377             # TODO - Why can't we reverse these two lines?
378             # TODO - Reversing them could avoid entering and removing GC marks.
379 4128         28435 $self->_data_ev_refcount_dec($event->[EV_SOURCE], $event->[EV_SESSION]);
380              
381 2956 100       11432 if ($event->[EV_TYPE] & (ET_SIGNAL | ET_SIGDIE)) {
382 1484         3477 $self->_dispatch_signal_event(@{$event}[EV_SESSION..EV_FROMSTATE], $priority, $id);
  1484         5192  
383             }
384             else {
385 3095         6585 $self->_dispatch_event(@{$event}[EV_SESSION..EV_FROMSTATE], $priority, $id);
  3095         8608  
386             }
387              
388             # Stop the system if an unhandled exception occurred.
389             # This wipes out all sessions and associated resources.
390 3372 100       14259 next unless $POE::Kernel::kr_exception;
391 1638         7426 POE::Kernel->stop();
392             }
393              
394             # Sweep for dead sessions. The sweep may alter the next queue time.
395              
396 3030         10392 $self->_data_ses_gc_sweep();
397 1609         3415 $next_time = $kr_queue->get_next_priority();
398              
399             # Tell the event loop to wait for the next event, if there is one.
400             # Otherwise we're going to wait indefinitely for some other event.
401              
402 1609 100       19433 if (defined $next_time) {
403 2317         4750 $self->loop_reset_time_watcher($next_time);
404             }
405             else {
406 1932         7242 $self->loop_pause_time_watcher();
407             }
408             }
409              
410             1;
411              
412             __END__