File Coverage

blib/lib/RxPerl/Subscription.pm
Criterion Covered Total %
statement 56 56 100.0
branch 30 32 93.7
condition 10 15 66.6
subroutine 10 10 100.0
pod 0 3 0.0
total 106 116 91.3


line stmt bran cond sub pod time code
1             package RxPerl::Subscription;
2 5     5   53 use strict;
  5         10  
  5         195  
3 5     5   25 use warnings;
  5         10  
  5         344  
4              
5 5     5   38 use Scalar::Util 'blessed', 'reftype', 'weaken';
  5         36  
  5         5092  
6              
7             our $VERSION = "v6.29.8";
8              
9             sub new {
10 2128     2128 0 3400 my ($class) = @_;
11              
12 2128         4918 my $self = {
13             # the 'subrefs' key will be created by autovivification
14             closed => 0,
15             subscribers => [],
16             };
17              
18 2128         5577 bless $self, $class;
19             }
20              
21             sub _execute_item {
22 9296     9296   13316 my ($self, $item) = @_;
23              
24 9296 100       19182 if (! defined $item) {
    50          
25 52         188 return undef;
26             } elsif (ref $item ne '') {
27 9244 100 66     34244 if (reftype $item eq 'CODE') {
    100 66        
    100 33        
    100          
    100          
    50          
28 2013         4080 $item->();
29             }
30             elsif (defined blessed($item) and $item->isa('RxPerl::Subscription')) {
31 966 100       3052 $item->unsubscribe unless $item eq $self;
32             }
33             elsif (reftype $item eq 'ARRAY' and not defined blessed($item)) {
34 3994         7547 $self->_execute_item($_) foreach @$item;
35 3994         8805 @$item = ();
36             }
37             elsif (reftype $item eq 'REF') {
38             # ref to ::Subscription object
39 9         29 $self->_execute_item($$item);
40 9         26 $$item = undef;
41             }
42             elsif (reftype $item eq 'SCALAR') {
43             # ref to undef, or some other invalid construct
44 1         3 return undef;
45             }
46             elsif (reftype $item eq 'HASH' and not defined blessed($item)) {
47 2261         6847 $self->_execute_item([values %$item]);
48 2261         11883 %$item = ();
49             }
50             }
51             }
52              
53             sub _add_to_subscribers {
54 2390     2390   3686 my ($self, $subscriber) = @_;
55              
56 2390         2928 push @{ $self->{subscribers} }, $subscriber;
  2390         4072  
57              
58 2390         4096 weaken($self->{subscribers}[-1]);
59              
60             # wrap 'complete' and 'error' of first subscriber
61 2390 100       2854 if ((grep defined, @{ $self->{subscribers} }) == 1) {
  2390         6232  
62 2128         3619 foreach (qw/ error complete /) {
63             # wrap with 'unsubscribe'
64 4256         6111 my $orig_fn = $subscriber->{$_};
65             $subscriber->{$_} = sub {
66 1991 100   1991   6273 $orig_fn->(@_) if defined $orig_fn;
67 1991         5034 $self->unsubscribe;
68             }
69 4256         13396 }
70             }
71             }
72              
73             sub _cleanup_base_subrefs {
74 2592     2592   3568 my ($self) = @_;
75              
76 2592 100       4990 $self->{subrefs} or return;
77              
78 510         662 foreach my $key (keys %{ $self->{subrefs} }) {
  510         1205  
79 829         1181 my $item = $self->{subrefs}{$key};
80 829 100 66     1995 if (defined blessed($item) and $item->isa('RxPerl::Subscription') and $item->{closed}) {
      100        
81 3         7 delete $self->{subrefs}{$key};
82             }
83             }
84             }
85              
86             sub add {
87 3512     3512 0 5547 my ($self, @subrefs) = @_;
88              
89             # filter out any non-refs
90 3512         6897 @subrefs = grep ref ne '', @subrefs;
91              
92 3512 100       6108 if (! $self->{closed}) {
93 2592         5125 $self->_cleanup_base_subrefs;
94 2592         10039 $self->{subrefs}{$_} = $_ foreach @subrefs;
95             } else {
96 920         1860 $self->_execute_item(\@subrefs);
97             }
98             }
99              
100             sub unsubscribe {
101 2959     2959 0 6270 my ($self) = @_;
102              
103 2959 100       7037 return if $self->{closed}++;
104              
105             # no need for 'of' (or any other observable) to check 'closed status' anymore
106 2088         2632 foreach my $subscriber (@{ $self->{subscribers} }) {
  2088         3674  
107 2350 100       11903 delete @$subscriber{qw/ next error complete /} if defined $subscriber;
108             }
109              
110 2088         4050 $self->{subscribers} = [];
111              
112 2088         4176 $self->_execute_item(delete $self->{subrefs});
113             }
114              
115             1;