File Coverage

blib/lib/RxPerl/Subscription.pm
Criterion Covered Total %
statement 48 48 100.0
branch 26 28 92.8
condition 5 9 55.5
subroutine 9 9 100.0
pod 0 3 0.0
total 88 97 90.7


line stmt bran cond sub pod time code
1             package RxPerl::Subscription;
2 5     5   30 use strict;
  5         9  
  5         133  
3 5     5   25 use warnings;
  5         26  
  5         169  
4              
5 5     5   28 use Scalar::Util 'blessed', 'reftype', 'weaken';
  5         49  
  5         3397  
6              
7             our $VERSION = "v6.27.0";
8              
9             sub new {
10 2006     2006 0 3425 my ($class) = @_;
11              
12 2006         4336 my $self = {
13             # the 'subrefs' key will be created by autovivification
14             closed => 0,
15             subscribers => [],
16             };
17              
18 2006         5510 bless $self, $class;
19             }
20              
21             sub _execute_item {
22 8771     8771   13681 my ($self, $item) = @_;
23              
24 8771 100       19527 if (! defined $item) {
    50          
25 45         127 return undef;
26             } elsif (ref $item ne '') {
27 8726 100 66     42825 if (reftype $item eq 'CODE') {
    100 66        
    100 33        
    100          
    100          
    50          
28 1911         3841 $item->();
29             }
30             elsif (defined blessed($item) and $item->isa('RxPerl::Subscription')) {
31 905 100       2847 $item->unsubscribe unless $item eq $self;
32             }
33             elsif (reftype $item eq 'ARRAY' and not defined blessed($item)) {
34 3765         7964 $self->_execute_item($_) foreach @$item;
35 3765         8456 @$item = ();
36             }
37             elsif (reftype $item eq 'REF') {
38             # ref to ::Subscription object
39 8         29 $self->_execute_item($$item);
40 8         24 $$item = undef;
41             }
42             elsif (reftype $item eq 'SCALAR') {
43             # ref to undef, or some other invalid construct
44 1         3 return undef;
45             }
46             elsif (reftype $item eq 'HASH' and not defined blessed($item)) {
47 2136         7738 $self->_execute_item([values %$item]);
48 2136         13842 %$item = ();
49             }
50             }
51             }
52              
53             sub _add_to_subscribers {
54 2244     2244   3601 my ($self, $subscriber) = @_;
55              
56 2244         2877 push @{ $self->{subscribers} }, $subscriber;
  2244         3989  
57              
58 2244         6101 weaken($self->{subscribers}[-1]);
59              
60             # wrap 'complete' and 'error' of first subscriber
61 2244 100       2769 if ((grep defined, @{ $self->{subscribers} }) == 1) {
  2244         6318  
62 2006         3400 foreach (qw/ error complete /) {
63             # wrap with 'unsubscribe'
64 4012         6107 my $orig_fn = $subscriber->{$_};
65             $subscriber->{$_} = sub {
66 1876 100   1876   5868 $orig_fn->(@_) if defined $orig_fn;
67 1876         3757 $self->unsubscribe;
68             }
69 4012         12429 }
70             }
71             }
72              
73             sub add {
74 3296     3296 0 5666 my ($self, @subrefs) = @_;
75              
76             # filter out any non-refs
77 3296         6938 @subrefs = grep ref ne '', @subrefs;
78              
79 3296 100       6209 if (! $self->{closed}) {
80 2440         10265 $self->{subrefs}{$_} = $_ foreach @subrefs;
81             } else {
82 856         1660 $self->_execute_item(\@subrefs);
83             }
84             }
85              
86             sub unsubscribe {
87 2791     2791 0 5597 my ($self) = @_;
88              
89 2791 100       6857 return if $self->{closed}++;
90              
91             # no need for 'of' (or any other observable) to check 'closed status' anymore
92 1966         2620 foreach my $subscriber (@{ $self->{subscribers} }) {
  1966         3656  
93 2204 100       10182 delete @$subscriber{qw/ next error complete /} if defined $subscriber;
94             }
95              
96 1966         3918 $self->{subscribers} = [];
97              
98 1966         4572 $self->_execute_item(delete $self->{subrefs});
99             }
100              
101             1;