File Coverage

blib/lib/RxPerl/Subject.pm
Criterion Covered Total %
statement 49 51 96.0
branch 12 22 54.5
condition n/a
subroutine 11 12 91.6
pod 0 4 0.0
total 72 89 80.9


line stmt bran cond sub pod time code
1             package RxPerl::Subject;
2 5     5   36 use strict;
  5         15  
  5         210  
3 5     5   24 use warnings;
  5         10  
  5         288  
4              
5 5     5   30 use base 'RxPerl::Observable';
  5         9  
  5         481  
6              
7 5     5   3135 use Hash::Ordered;
  5         21710  
  5         217  
8 5     5   34 use Scalar::Util 'weaken';
  5         8  
  5         3599  
9              
10             our $VERSION = "v6.29.8";
11              
12             # over-rideable
13             # sub _on_subscribe {
14             # my ($self, $subscriber) = @_;
15             # ...
16             # }
17              
18             # over-rideable
19             # sub _on_subscribe_closed {
20             # my ($self, $subscriber) = @_;
21             # ...
22             # }
23              
24             sub new {
25 25     25 0 377648 my ($class) = @_;
26              
27 25         89 my $subscribers_oh = Hash::Ordered->new();
28 25         360 weaken(my $w_subscribers_oh = $subscribers_oh);
29              
30 25         42 my $w_self;
31             my $self = $class->SUPER::new(sub {
32 45     45   110 my ($subscriber) = @_;
33              
34 45 100       112 if ($w_self->{_closed}) {
35 1 50       11 $w_self->_on_subscribe_closed($subscriber) if $w_self->can('_on_subscribe_closed');
36 1         4 my ($type, @args) = @{ $w_self->{_closed} };
  1         3  
37 1 50       6 $subscriber->{$type}->(@args) if defined $subscriber->{$type};
38 1         3 return;
39             }
40              
41 44         216 $w_subscribers_oh->set("$subscriber", $subscriber);
42 44 100       939 $w_self->_on_subscribe($subscriber) if $w_self->can('_on_subscribe');
43              
44 44         107 my $string = "$subscriber";
45             # return;
46             return sub {
47 4 50       16 $w_subscribers_oh and $w_subscribers_oh->delete($string);
48 44         303 };
49 25         303 });
50 25         56 weaken($w_self = $self);
51              
52 25         79 $self->{_closed} = 0;
53 25         55 foreach my $type (qw/ error complete /) {
54             $self->{$type} = sub {
55 5 50   5   16 return if $w_self->{_closed};
56 5         14 $w_self->{_closed} = [$type, @_];
57 5         16 foreach my $subscriber ($subscribers_oh->values) {
58 4 50       43 $subscriber->{$type}->(@_) if defined $subscriber->{$type};
59             }
60 5         51 $subscribers_oh->clear();
61             # TODO: maybe: delete @$self{qw/ next error complete /};
62             # (Think about how subclasses such as BehaviorSubjects will be affected)
63 50         395 };
64             }
65             $self->{next} = sub {
66 32     32   102 foreach my $subscriber ($subscribers_oh->values) {
67 47 50       521 $subscriber->{next}->(@_) if defined $subscriber->{next};
68             }
69 25         83 };
70              
71 25         79 return $self;
72             }
73              
74             sub next {
75 30     30 0 365 my $self = shift;
76              
77 30 50       170 $self->{next}->(splice @_, 0, 1) if defined $self->{next};
78             }
79              
80             sub error {
81 0     0 0 0 my $self = shift;
82              
83 0 0       0 $self->{error}->(splice @_, 0, 1) if defined $self->{error};
84             }
85              
86             sub complete {
87 4     4 0 30 my $self = shift;
88              
89 4 50       19 $self->{complete}->() if defined $self->{complete};
90             }
91              
92             1;