File Coverage

blib/lib/Mojo/Rx/ConnectableObservable.pm
Criterion Covered Total %
statement 15 36 41.6
branch 0 4 0.0
condition n/a
subroutine 5 9 55.5
pod 0 2 0.0
total 20 51 39.2


line stmt bran cond sub pod time code
1             package Mojo::Rx::ConnectableObservable;
2 2     2   14 use strict;
  2         5  
  2         65  
3 2     2   9 use warnings FATAL => 'all';
  2         4  
  2         67  
4              
5 2     2   10 use base 'Mojo::Rx::Observable';
  2         5  
  2         156  
6              
7 2     2   12 use Mojo::Rx::Subscription;
  2         4  
  2         58  
8              
9 2     2   12 use Scalar::Util 'weaken';
  2         3  
  2         751  
10              
11             our $VERSION = "v0.13.0";
12              
13             sub new {
14 0     0 0   my ($class, $source, $subject_factory) = @_;
15              
16 0           my $weak_self;
17             my $self = $class->SUPER::new(sub {
18 0     0     my ($subscriber) = @_;
19              
20 0           return $weak_self->{_subject}->subscribe($subscriber);
21 0           });
22 0           weaken($weak_self = $self);
23              
24 0           %$self = (
25             %$self,
26             _source => $source,
27             _subject_factory => $subject_factory,
28             _subject => $subject_factory->(),
29             _connected => 0,
30             _subjects_subscription => undef,
31             );
32              
33 0           return $self;
34             }
35              
36             sub connect {
37 0     0 0   my ($self) = @_;
38              
39 0 0         return $self->{_subjects_subscription} if $self->{_connected};
40              
41 0           $self->{_connected} = 1;
42              
43 0           $self->{_subjects_subscription} = Mojo::Rx::Subscription->new;
44 0           weaken(my $weak_self = $self);
45             $self->{_subjects_subscription}->add_dependents(sub {
46 0 0   0     if (defined $weak_self) {
47 0           $weak_self->{_connected} = 0;
48 0           $weak_self->{_subjects_subscription} = undef;
49 0           $weak_self->{_subject} = $weak_self->{_subject_factory}->();
50             }
51 0           });
52              
53             $self->{_source}->subscribe({
54             new_subscription => $self->{_subjects_subscription},
55 0           %{ $self->{_subject} },
  0            
56             });
57              
58 0           return $self->{_subjects_subscription};
59             }
60              
61             1;