File Coverage

blib/lib/Test/Stream/Hub.pm
Criterion Covered Total %
statement 207 207 100.0
branch 73 82 89.0
condition 56 84 66.6
subroutine 31 31 100.0
pod 17 23 73.9
total 384 427 89.9


line stmt bran cond sub pod time code
1             package Test::Stream::Hub;
2 109     109   1081 use strict;
  109         189  
  109         2655  
3 109     109   507 use warnings;
  109         169  
  109         2837  
4              
5 109     109   522 use Carp qw/carp croak/;
  109         177  
  109         5516  
6 109     109   57005 use Test::Stream::State;
  109         256  
  109         3227  
7 109     109   618 use Test::Stream::Util qw/get_tid/;
  109         197  
  109         761  
8              
9 109     109   558 use Scalar::Util qw/weaken/;
  109         191  
  109         6593  
10              
11             use Test::Stream::HashBase(
12 109         664 accessors => [qw{
13             pid tid hid ipc
14             state
15             no_ending
16             _todo _meta parent_todo
17             _mungers
18             _listeners
19             _follow_ups
20             _formatter
21             _context_init
22             _context_release
23             }],
24 109     109   555 );
  109         197  
25              
26             my $ID_POSTFIX = 1;
27             sub init {
28 520     520 0 1090 my $self = shift;
29              
30 520         2160 $self->{+PID} = $$;
31 520         1372 $self->{+TID} = get_tid();
32 520         2722 $self->{+HID} = join '-', $self->{+PID}, $self->{+TID}, $ID_POSTFIX++;
33              
34 520         1294 $self->{+_TODO} = [];
35 520         1276 $self->{+_META} = {};
36              
37 520   33     4934 $self->{+STATE} ||= Test::Stream::State->new;
38              
39 520 100       1726 if (my $formatter = delete $self->{formatter}) {
40 5         18 $self->format($formatter);
41             }
42              
43 520 100       2103 if (my $ipc = $self->{+IPC}) {
44 136         996 $ipc->add_hub($self->{+HID});
45             }
46             }
47              
48             sub inherit {
49 240     240 1 439 my $self = shift;
50 240         484 my ($from, %params) = @_;
51              
52             $self->{+_FORMATTER} = $from->{+_FORMATTER}
53 240 100 33     1437 unless $self->{+_FORMATTER} || exists($params{formatter});
54              
55 240 100 66     2125 if ($from->{+IPC} && !$self->{+IPC} && !exists($params{ipc})) {
      100        
56 232         504 my $ipc = $from->{+IPC};
57 232         744 $self->{+IPC} = $ipc;
58 232         1079 $ipc->add_hub($self->{+HID});
59             }
60              
61 240 100       1053 if (my $ls = $from->{+_LISTENERS}) {
62 221         374 push @{$self->{+_LISTENERS}} => grep { $_->{inherit} } @$ls;
  221         967  
  294         866  
63             }
64              
65 240 100       1315 if (my $ms = $from->{+_MUNGERS}) {
66 12         15 push @{$self->{+_MUNGERS}} => grep { $_->{inherit} } @$ms;
  12         48  
  6         27  
67             }
68             }
69              
70             sub debug_todo {
71 3983     3983 0 10950 my ($self) = @_;
72 3983         6900 my $array = $self->{+_TODO};
73 3983   100     12527 pop @$array while @$array && !defined $array->[-1];
74             return (
75             parent_todo => $self->{+PARENT_TODO},
76 3983 100       33190 todo => @$array ? ${$array->[-1]} : undef,
  17         136  
77             )
78             }
79              
80             sub meta {
81 7     7 1 27 my $self = shift;
82 7         15 my ($key, $default) = @_;
83              
84 7 100       208 croak "Invalid key '" . (defined($key) ? $key : '(UNDEF)') . "'"
    100          
85             unless $key;
86              
87 5         8 my $exists = $self->{+_META}->{$key};
88 5 100 100     30 return undef unless $default || $exists;
89              
90 4 100       15 $self->{+_META}->{$key} = $default unless $exists;
91              
92 4         18 return $self->{+_META}->{$key};
93             }
94              
95             sub delete_meta {
96 3     3 1 17 my $self = shift;
97 3         5 my ($key) = @_;
98              
99 3 100       180 croak "Invalid key '" . (defined($key) ? $key : '(UNDEF)') . "'"
    100          
100             unless $key;
101              
102 1         4 delete $self->{+_META}->{$key};
103             }
104              
105             sub set_todo {
106 22     22 1 91 my $self = shift;
107 22         50 my ($reason) = @_;
108              
109 22 100       73 unless (defined wantarray) {
110 1         101 carp "set_todo(...) called in void context, todo not set!";
111 1         3 return;
112             }
113              
114 21 100       63 unless(defined $reason) {
115 1         95 carp "set_todo() called with undefined argument, todo not set!";
116 1         7 return;
117             }
118              
119 20         38 my $ref = \$reason;
120 20         34 push @{$self->{+_TODO}} => $ref;
  20         57  
121 20         80 weaken($self->{+_TODO}->[-1]);
122 20         60 return $ref;
123             }
124              
125             sub get_todo {
126 6     6 1 26 my $self = shift;
127 6         9 my $array = $self->{+_TODO};
128 6   100     42 pop @$array while @$array && !defined($array->[-1]);
129 6 100       28 return undef unless @$array;
130 3         4 return ${$array->[-1]};
  3         20  
131             }
132              
133             sub format {
134 597     597 1 1060 my $self = shift;
135              
136 597         1127 my $old = $self->{+_FORMATTER};
137 597 100       2281 ($self->{+_FORMATTER}) = @_ if @_;
138              
139 597         3538 return $old;
140             }
141              
142             sub is_local {
143 316     316 0 492 my $self = shift;
144             return $$ == $self->{+PID}
145 316   33     2998 && get_tid() == $self->{+TID};
146             }
147              
148             sub listen {
149 680     680 1 1150 my $self = shift;
150 680         1423 my ($sub, %params) = @_;
151              
152             carp "Useless addition of a listener in a child process or thread!"
153 680 50 33     4604 if $$ != $self->{+PID} || get_tid() != $self->{+TID};
154              
155 680 100 66     3965 croak "listen only takes coderefs for arguments, got '$sub'"
156             unless ref $sub && ref $sub eq 'CODE';
157              
158 679         914 push @{$self->{+_LISTENERS}} => { %params, code => $sub };
  679         2775  
159              
160 679         2111 $sub; # Intentional return.
161             }
162              
163             sub unlisten {
164 313     313 1 570 my $self = shift;
165              
166             carp "Useless removal of a listener in a child process or thread!"
167 313 50 33     2009 if $$ != $self->{+PID} || get_tid() != $self->{+TID};
168              
169 313         660 my %subs = map {$_ => $_} @_;
  313         1483  
170              
171 313         518 @{$self->{+_LISTENERS}} = grep { !$subs{$_->{code}} } @{$self->{+_LISTENERS}};
  313         2820  
  650         2000  
  313         729  
172             }
173              
174             sub munge {
175 10     10 1 46 my $self = shift;
176 10         20 my ($sub, %params) = @_;
177              
178             carp "Useless addition of a munger in a child process or thread!"
179 10 50 33     109 if $$ != $self->{+PID} || get_tid() != $self->{+TID};
180              
181 10 100 66     185 croak "munge only takes coderefs for arguments, got '$sub'"
182             unless ref $sub && ref $sub eq 'CODE';
183              
184 9         13 push @{$self->{+_MUNGERS}} => { %params, code => $sub };
  9         42  
185              
186 9         31 $sub; # Intentional Return
187             }
188              
189             sub unmunge {
190 1     1 1 9 my $self = shift;
191             carp "Useless removal of a munger in a child process or thread!"
192 1 50 33     12 if $$ != $self->{+PID} || get_tid() != $self->{+TID};
193 1         3 my %subs = map {$_ => $_} @_;
  1         5  
194 1         3 @{$self->{+_MUNGERS}} = grep { !$subs{$_->{code}} } @{$self->{+_MUNGERS}};
  1         5  
  2         7  
  1         2  
195             }
196              
197             sub follow_up {
198 38     38 0 114 my $self = shift;
199 38         100 my ($sub) = @_;
200              
201             carp "Useless addition of a follow-up in a child process or thread!"
202 38 50 33     392 if $$ != $self->{+PID} || get_tid() != $self->{+TID};
203              
204 38 100 66     729 croak "follow_up only takes coderefs for arguments, got '$sub'"
205             unless ref $sub && ref $sub eq 'CODE';
206              
207 36         82 push @{$self->{+_FOLLOW_UPS}} => $sub;
  36         176  
208             }
209              
210             sub add_context_init {
211 1     1 1 9 my $self = shift;
212 1         3 my ($sub) = @_;
213              
214 1 50 33     9 croak "add_context_init only takes coderefs for arguments, got '$sub'"
215             unless ref $sub && ref $sub eq 'CODE';
216              
217 1         2 push @{$self->{+_CONTEXT_INIT}} => $sub;
  1         9  
218              
219 1         4 $sub; # Intentional return.
220             }
221              
222             sub remove_context_init {
223 1     1 1 10 my $self = shift;
224 1         4 my %subs = map {$_ => $_} @_;
  1         6  
225 1         4 @{$self->{+_CONTEXT_INIT}} = grep { !$subs{$_} == $_ } @{$self->{+_CONTEXT_INIT}};
  1         6  
  1         7  
  1         3  
226             }
227              
228             sub add_context_release {
229 1     1 1 7 my $self = shift;
230 1         3 my ($sub) = @_;
231              
232 1 50 33     14 croak "add_context_release only takes coderefs for arguments, got '$sub'"
233             unless ref $sub && ref $sub eq 'CODE';
234              
235 1         2 push @{$self->{+_CONTEXT_RELEASE}} => $sub;
  1         3  
236              
237 1         3 $sub; # Intentional return.
238             }
239              
240             sub remove_context_release {
241 1     1 1 6 my $self = shift;
242 1         3 my %subs = map {$_ => $_} @_;
  1         7  
243 1         3 @{$self->{+_CONTEXT_RELEASE}} = grep { !$subs{$_} == $_ } @{$self->{+_CONTEXT_RELEASE}};
  1         4  
  1         4  
  1         4  
244             }
245              
246             sub send {
247 3599     3599 1 7834 my $self = shift;
248 3599         5357 my ($e) = @_;
249              
250 3599   100     10392 my $ipc = $self->{+IPC} || return $self->process($e);
251              
252 3500 100       11666 if($e->global) {
253 25         106 $ipc->send('GLOBAL', $e);
254 25         108 return $self->process($e);
255             }
256              
257             return $ipc->send($self->{+HID}, $e)
258 3475 100 66     19902 if $$ != $self->{+PID} || get_tid() != $self->{+TID};
259              
260 3469         9959 $self->process($e);
261             }
262              
263             sub process {
264 3598     3598 1 5272 my $self = shift;
265 3598         5042 my ($e) = @_;
266              
267 3598 100       8704 if ($self->{+_MUNGERS}) {
268 12         21 for (@{$self->{+_MUNGERS}}) {
  12         37  
269 14         45 $_->{code}->($self, $e);
270 14 100       205 return unless $e;
271             }
272             }
273              
274 3589         5223 my $state = $self->{+STATE};
275 3589         11265 $e->update_state($state);
276 3589         11788 my $count = $state->count;
277              
278 3589 100       19976 $self->{+_FORMATTER}->write($e, $count) if $self->{+_FORMATTER};
279              
280 3589 100       11502 if ($self->{+_LISTENERS}) {
281 2023         2579 $_->{code}->($self, $e, $count) for @{$self->{+_LISTENERS}};
  2023         9654  
282             }
283              
284 3589         12074 my $code = $e->terminate;
285 3589 100       8347 $self->terminate($code, $e) if defined $code;
286              
287 3563         16962 return $e;
288             }
289              
290             sub terminate {
291 3     3 0 7 my $self = shift;
292 3         6 my ($code) = @_;
293 3         48 exit($code);
294             }
295              
296             sub cull {
297 813     813 1 1356 my $self = shift;
298              
299 813   100     2606 my $ipc = $self->{+IPC} || return;
300 791 50 33     4931 return if $self->{+PID} != $$ || $self->{+TID} != get_tid();
301              
302             # No need to do IPC checks on culled events
303 791         3493 $self->process($_) for $ipc->cull($self->{+HID});
304             }
305              
306             sub finalize {
307 348     348 0 5255 my $self = shift;
308 348         657 my ($dbg, $do_plan) = @_;
309              
310 348         1160 $self->cull();
311 348         1290 my $state = $self->{+STATE};
312              
313 348         1661 my $plan = $state->plan;
314 348         1272 my $count = $state->count;
315 348         2041 my $failed = $state->failed;
316              
317             # return if NOTHING was done.
318 348 100 100     2345 return unless $do_plan || defined($plan) || $count || $failed;
      100        
      100        
319              
320 336 100       1267 unless ($state->ended) {
321 334 100       2180 if ($self->{+_FOLLOW_UPS}) {
322 33         76 $_->($dbg, $self) for reverse @{$self->{+_FOLLOW_UPS}};
  33         251  
323             }
324              
325             # These need to be refreshed now
326 333         1250 $plan = $state->plan;
327 333         1141 $count = $state->count;
328 333         1732 $failed = $state->failed;
329              
330 333 100 100     3728 if (($plan && $plan eq 'NO PLAN') || ($do_plan && !$plan)) {
      100        
      66        
331 316         2609 $self->send(
332             Test::Stream::Event::Plan->new(
333             debug => $dbg,
334             max => $count,
335             )
336             );
337 316         1760 $plan = $state->plan;
338             }
339             }
340              
341 335         1442 $state->finish($dbg->frame);
342             }
343              
344             sub DESTROY {
345 411     411   1213 my $self = shift;
346 411   100     1685 my $ipc = $self->{+IPC} || return;
347 379 100       1558 return unless $$ == $self->{+PID};
348 378 50       1062 return unless get_tid() == $self->{+TID};
349              
350 378         1349 local $?;
351 378         1836 $ipc->drop_hub($self->{+HID});
352             }
353              
354             1;
355              
356             __END__