File Coverage

blib/lib/POE/Resource/Events.pm
Criterion Covered Total %
statement 136 150 90.6
branch 57 72 79.1
condition 22 31 70.9
subroutine 21 22 95.4
pod n/a
total 236 275 85.8


line stmt bran cond sub pod time code
1             # Data and accessors to manage POE's events.
2              
3             package POE::Resource::Events;
4              
5 176     176   737 use vars qw($VERSION);
  176         240  
  176         9119  
6             $VERSION = '1.366'; # NOTE - Should be #.### (three decimal places)
7              
8             # These methods are folded into POE::Kernel;
9             package POE::Kernel;
10              
11 176     176   766 use strict;
  176         242  
  176         288617  
12              
13             # A local copy of the queue so we can manipulate it directly.
14             my $kr_queue;
15              
16             my %event_count;
17             # ( $session_id => $count,
18             # ...,
19             # );
20              
21             my %post_count;
22             # ( $session_id => $count,
23             # ...,
24             # );
25              
26             ### Begin-run initialization.
27              
28             sub _data_ev_initialize {
29 176     176   312 my ($self, $queue) = @_;
30 176         480 $kr_queue = $queue;
31             }
32              
33             ### End-run leak checking.
34              
35             sub _data_ev_relocate_kernel_id {
36 4     4   15 my ($self, $old_id, $new_id) = @_;
37              
38 4 100       37 $event_count{$new_id} = delete $event_count{$old_id}
39             if exists $event_count{$old_id};
40 4 50       25 $post_count{$new_id} = delete $post_count{$old_id}
41             if exists $post_count{$old_id};
42             }
43              
44             sub _data_ev_finalize {
45 191     191   1086 my $finalized_ok = 1;
46 191         799 while (my ($ses_id, $cnt) = each(%event_count)) {
47 0         0 $finalized_ok = 0;
48 0         0 _warn("!!! Leaked event-to count: $ses_id = $cnt\n");
49             }
50              
51 191         1289 while (my ($ses_id, $cnt) = each(%post_count)) {
52 0         0 $finalized_ok = 0;
53 0         0 _warn("!!! Leaked event-from count: $ses_id = $cnt\n");
54             }
55 191         355 return $finalized_ok;
56             }
57              
58             ### Enqueue an event.
59              
60             sub FIFO_TIME_EPSILON () { 0.000001 }
61             my $last_fifo_time = monotime();
62              
63             sub _data_ev_enqueue {
64             my (
65 5415     5415   14588 $self,
66             $session, $source_session, $event, $type, $etc,
67             $file, $line, $fromstate, $time, $delta, $priority
68             ) = @_;
69              
70 5415         13453 my $sid = $session->ID;
71              
72 5414         6201 if (ASSERT_DATA) {
73             unless ($self->_data_ses_exists($sid)) {
74             _trap(
75             " can't enqueue event ``$event'' for nonexistent",
76             $self->_data_alias_loggable($sid)
77             );
78             }
79             }
80              
81             # This is awkward, but faster than using the fields individually.
82 5414         15075 my $event_to_enqueue = [ @_[(1+EV_SESSION) .. (1+EV_FROMSTATE)] ];
83 1227 100       2297 if( defined $time ) {
84 4195         12855 $event_to_enqueue->[EV_WALLTIME] = $time;
85 4195         7266 $event_to_enqueue->[EV_DELTA] = $delta;
86 1515   100     3026 $priority ||= wall2mono( $time + ($delta||0) );
      100        
87             }
88             else {
89 2726   66     6902 $priority ||= monotime();
90             }
91              
92 2734         10845 my $new_id;
93 3907         18609 my $old_head_priority = $kr_queue->get_next_priority();
94              
95 5414 100       7266 unless ($type & ET_MASK_DELAYED) {
96 4620 100       13923 $priority = $last_fifo_time + FIFO_TIME_EPSILON if $priority <= $last_fifo_time;
97 4620         9253 $last_fifo_time = $priority;
98             }
99              
100 3444         9114 $new_id = $kr_queue->enqueue($priority, $event_to_enqueue);
101 3444         5215 $event_to_enqueue->[EV_SEQ] = $new_id;
102              
103             #_carp( Carp::longmess( " priority is much to far in the future" ) ) if $priority > 1354569908;
104 5414         11539 if (TRACE_EVENTS ) {
105             _warn(
106             " enqueued event $new_id ``$event'' from ",
107             $self->_data_alias_loggable($source_session->ID), " to ",
108             $self->_data_alias_loggable($sid),
109             " at $time, priority=$priority"
110             );
111             }
112              
113 5414 100       12226 unless (defined $old_head_priority) {
    100          
114 4388         4593 $self->loop_resume_time_watcher($priority);
115             }
116             elsif ($priority < $old_head_priority) {
117 4730         15642 $self->loop_reset_time_watcher($priority);
118             }
119              
120             # This is the counterpart to _data_ev_refcount_dec(). It's only
121             # used in one place, so it's not in its own function.
122              
123 5053 100       24565 $self->_data_ses_refcount_inc($sid) unless $event_count{$sid}++;
124              
125 1887 100       5860 return $new_id if $sid eq $source_session->ID();
126              
127 1075 100       7364 $self->_data_ses_refcount_inc($source_session->ID) unless (
128             $post_count{$source_session->ID}++
129             );
130              
131 4286         15561 return $new_id;
132             }
133              
134             sub _data_ev_set
135             {
136 3377     0   303748 my( $self, $alarm_id, $my_alarm, $time, $pri, $delta ) = @_;
137              
138 193         33383 my $event = (
139 215         571 grep { $_->[1] == $alarm_id }
140             $kr_queue->peek_items( $my_alarm )
141             )[0];
142              
143 0 100       0 return unless $event;
144              
145 0         0 my $payload = $event->[ITEM_PAYLOAD];
146              
147             # XXX - However, if there has been a clock skew, the priority will
148             # have changed and we should recalculate priority from time+delta
149              
150 0 0 0     0 $delta = $payload->[EV_DELTA] || 0 unless defined $delta;
151 0         0 $kr_queue->set_priority( $alarm_id, $my_alarm, $pri+$delta );
152 0         0 $payload->[EV_WALLTIME] = $time;
153 0         0 $payload->[EV_DELTA] = $delta;
154              
155 0   0     0 return( ($payload->[EV_WALLTIME] || 0) + ($payload->[EV_DELTA] || 0) );
      0        
156             }
157              
158             sub _data_ev_adjust
159             {
160 30     30   42 my( $self, $alarm_id, $my_alarm, $time, $delta ) = @_;
161              
162             # XXX - However, if there has been a clock skew, the priority will
163             # have changed and we should recalculate priority from time+delta
164 30 50       46 if( $time ) {
165             # PG - We are never invoked with $time anyway.
166 0         0 $kr_queue->set_priority( $alarm_id, $my_alarm, $time+$delta );
167             }
168             else {
169 30         90 $kr_queue->adjust_priority( $alarm_id, $my_alarm, $delta );
170             }
171              
172 3470         3379 my $event = (
173 30         66 grep { $_->[1] == $alarm_id }
174             $kr_queue->peek_items( $my_alarm )
175             )[0];
176              
177 30 50       122 return unless $event;
178              
179 30         33 my $payload = $event->[ITEM_PAYLOAD];
180              
181 30 50       50 $payload->[EV_WALLTIME] = $time if $time;
182 30 50       81 $payload->[EV_DELTA] += $delta if $delta;
183              
184 30   50     204 return( ($payload->[EV_WALLTIME] || 0) + ($payload->[EV_DELTA] || 0) );
      100        
185             }
186              
187             ### Remove events sent to or from a specific session.
188              
189             sub _data_ev_clear_session {
190 792     792   1819 my ($self, $sid) = @_;
191              
192             # Events sent to the session.
193 792         1235 PENDING: {
194 792         888 my $pending_count = $event_count{$sid};
195 792 100       2020 last PENDING unless $pending_count;
196              
197 222         2057 foreach (
198             $kr_queue->remove_items(
199 244     244   1064 sub { $_[0][EV_SESSION]->ID() eq $sid },
200             $pending_count
201             )
202             ) {
203 227         857 $self->_data_ev_refcount_dec(
204 227         405 @{$_->[ITEM_PAYLOAD]}[EV_SOURCE, EV_SESSION]
205             );
206 227         599 $pending_count--;
207             }
208              
209             # TODO - fork() can make this go negative on some systems.
210 222 50       1874 last PENDING unless $pending_count;
211              
212 0 0       0 croak "lingering pending count: $pending_count" if $pending_count;
213             }
214              
215             # Events sent by the session.
216             SENT: {
217 792         957 my $sent_count = $post_count{$sid};
  792         1111  
218 792 100       1726 last SENT unless $sent_count;
219              
220 3         48 foreach (
221             $kr_queue->remove_items(
222 3     3   12 sub { $_[0][EV_SOURCE]->ID() eq $sid },
223             $sent_count
224             )
225             ) {
226 3         10 $self->_data_ev_refcount_dec(
227 3         6 @{$_->[ITEM_PAYLOAD]}[EV_SOURCE, EV_SESSION]
228             );
229 3         12 $sent_count--;
230             }
231              
232 3 50       26 last SENT unless $sent_count;
233              
234 0 0       0 croak "lingering sent count: $sent_count" if $sent_count;
235             }
236              
237 792 50       2107 croak "lingering event count" if delete $event_count{$sid};
238 792 50       2296 croak "lingering post count" if delete $post_count{$sid};
239             }
240              
241             # TODO Alarm maintenance functions may move out to a separate
242             # POE::Resource module in the future. Why? Because alarms may
243             # eventually be managed by something other than the event queue.
244             # Especially if we incorporate a proper Session scheduler. Be sure to
245             # move the tests to a corresponding t/res/*.t file.
246              
247             ### Remove a specific alarm by its name. This is in the events
248             ### section because alarms are currently implemented as events with
249             ### future due times.
250              
251             sub _data_ev_clear_alarm_by_name {
252 3549     3549   5530 my ($self, $sid, $alarm_name) = @_;
253              
254             my $my_alarm = sub {
255 798920 100   798920   1097254 return 0 unless $_[0]->[EV_TYPE] & ET_ALARM;
256 796050 100       1072545 return 0 unless $_[0]->[EV_SESSION]->ID() eq $sid;
257 793110 100       2505696 return 0 unless $_[0]->[EV_NAME] eq $alarm_name;
258 1382         2910 return 1;
259 3549         17094 };
260              
261 3549         9796 foreach ($kr_queue->remove_items($my_alarm)) {
262 1382         1537 $self->_data_ev_refcount_dec(@{$_->[ITEM_PAYLOAD]}[EV_SOURCE, EV_SESSION]);
  1382         3869  
263             }
264             }
265              
266             ### Remove a specific alarm by its ID. This is in the events section
267             ### because alarms are currently implemented as events with future due
268             ### times. TODO It's possible to remove non-alarms; is that wrong?
269              
270             sub _data_ev_clear_alarm_by_id {
271 229     229   242 my ($self, $sid, $alarm_id) = @_;
272              
273             my $my_alarm = sub {
274 228     228   513 $_[0]->[EV_SESSION]->ID() eq $sid;
275 229         695 };
276              
277 229         596 my ($pri, $id, $event) = $kr_queue->remove_item($alarm_id, $my_alarm);
278 229 100       475 return unless defined $pri;
279              
280 228         185 if (TRACE_EVENTS) {
281             _warn(
282             " removed event $id ``", $event->[EV_NAME], "'' to ",
283             $self->_data_alias_loggable($sid), " at $pri"
284             );
285             }
286              
287 228         712 $self->_data_ev_refcount_dec( @$event[EV_SOURCE, EV_SESSION] );
288 228   100     977 my $time = $event->[EV_WALLTIME] + ($event->[EV_DELTA]||0);
289 228         863 return ($time, $event);
290             }
291              
292             ### Remove all the alarms for a session. Whoot!
293              
294             sub _data_ev_clear_alarm_by_session {
295 221     4   888 my ($self, $sid) = @_;
296              
297             my $my_alarm = sub {
298 30 100   30   56 return 0 unless $_[0]->[EV_TYPE] & ET_ALARM;
299 26 100       46 return 0 unless $_[0]->[EV_SESSION]->ID() eq $sid;
300 5         11 return 1;
301 4         16 };
302              
303 4         6 my @removed;
304 4         13 foreach ($kr_queue->remove_items($my_alarm)) {
305 5         12 my ($pri, $event) = @$_[ITEM_PRIORITY, ITEM_PAYLOAD];
306 5         12 $self->_data_ev_refcount_dec( @$event[EV_SOURCE, EV_SESSION] );
307 5   100     59 my $time = ($event->[EV_WALLTIME]||0) + ($event->[EV_DELTA]||0);
      50        
308 5         9 push @removed, [ $event->[EV_NAME], $time, @{$event->[EV_ARGS]} ];
  5         15  
309             }
310              
311 4         27 return @removed;
312             }
313              
314             ### Decrement a post refcount
315              
316             sub _data_ev_refcount_dec {
317 5267     5267   7740 my ($self, $source_session, $dest_session) = @_;
318              
319 5267         13894 my ($source_id, $dest_id) = ($source_session->ID, $dest_session->ID);
320              
321 5267         5219 if (ASSERT_DATA) {
322             _trap $dest_session unless exists $event_count{$dest_id};
323             }
324              
325 5267 100       14253 $self->_data_ses_refcount_dec($dest_id) unless --$event_count{$dest_id};
326              
327 5267 100       16877 return if $dest_id eq $source_id;
328              
329 4168         15529 if (ASSERT_DATA) {
330             _trap $source_session unless exists $post_count{$source_id};
331             }
332              
333 314 100       769 $self->_data_ses_refcount_dec($source_id) unless --$post_count{$source_id};
334             }
335              
336             ### Fetch the number of pending events sent to a session.
337              
338             sub _data_ev_get_count_to {
339 4970     4755   6284 my ($self, $sid) = @_;
340 4970   100     23392 return $event_count{$sid} || 0;
341             }
342              
343             ### Fetch the number of pending events sent from a session.
344              
345             sub _data_ev_get_count_from {
346 4753     4753   6488 my ($self, $sid) = @_;
347 4753   100     19392 return $post_count{$sid} || 0;
348             }
349              
350             ### Dispatch events that are due for "now" or earlier.
351              
352             sub _data_ev_dispatch_due {
353 2688     2688   4549 my $self = shift;
354              
355 2688         2872 if (TRACE_EVENTS) {
356 1622     2346   9897 foreach ($kr_queue->peek_items(sub { 1 })) {
357             my @event = map { defined() ? $_ : "(undef)" } @{$_->[ITEM_PAYLOAD]};
358             _warn(
359             " time($_->[ITEM_PRIORITY]) id($_->[ITEM_ID]) ",
360             "event(@event)\n"
361             );
362             }
363             }
364              
365 2688         13604 my $now = monotime();
366 3852         7109 my $next_time;
367 3852   100     10235 while (
368             defined($next_time = $kr_queue->get_next_priority()) and
369             $next_time <= $now
370             ) {
371 22960         40013 my ($priority, $id, $event) = $kr_queue->dequeue_next();
372              
373 4146         7387 if (TRACE_EVENTS) {
374             _warn(" dispatching event $id ($event->[EV_NAME])");
375             }
376              
377             # TODO - Why can't we reverse these two lines?
378             # TODO - Reversing them could avoid entering and removing GC marks.
379 4146         36503 $self->_data_ev_refcount_dec($event->[EV_SOURCE], $event->[EV_SESSION]);
380              
381 2982 100       12264 if ($event->[EV_TYPE] & (ET_SIGNAL | ET_SIGDIE)) {
382 1462         1751 $self->_dispatch_signal_event(@{$event}[EV_SESSION..EV_FROMSTATE], $priority, $id);
  1462         5150  
383             }
384             else {
385 3142         6407 $self->_dispatch_event(@{$event}[EV_SESSION..EV_FROMSTATE], $priority, $id);
  3142         8598  
386             }
387              
388             # Stop the system if an unhandled exception occurred.
389             # This wipes out all sessions and associated resources.
390 3411 100       15221 next unless $POE::Kernel::kr_exception;
391 1637         9231 POE::Kernel->stop();
392             }
393              
394             # Sweep for dead sessions. The sweep may alter the next queue time.
395              
396 3117         9910 $self->_data_ses_gc_sweep();
397 1697         4516 $next_time = $kr_queue->get_next_priority();
398              
399             # Tell the event loop to wait for the next event, if there is one.
400             # Otherwise we're going to wait indefinitely for some other event.
401              
402 1697 100       4111 if (defined $next_time) {
403 2432         4851 $self->loop_reset_time_watcher($next_time);
404             }
405             else {
406 1903         8198 $self->loop_pause_time_watcher();
407             }
408             }
409              
410             1;
411              
412             __END__