|  line  | 
 stmt  | 
 bran  | 
 cond  | 
 sub  | 
 pod  | 
 time  | 
 code  | 
| 
1
 | 
  
 
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 package RxPerl::Observable;  | 
| 
2
 | 
5
 | 
 
 | 
 
 | 
  
5
  
 | 
 
 | 
25
 | 
 use strict;  | 
| 
 
 | 
5
 | 
 
 | 
 
 | 
 
 | 
 
 | 
9
 | 
    | 
| 
 
 | 
5
 | 
 
 | 
 
 | 
 
 | 
 
 | 
129
 | 
    | 
| 
3
 | 
5
 | 
 
 | 
 
 | 
  
5
  
 | 
 
 | 
20
 | 
 use warnings;  | 
| 
 
 | 
5
 | 
 
 | 
 
 | 
 
 | 
 
 | 
7
 | 
    | 
| 
 
 | 
5
 | 
 
 | 
 
 | 
 
 | 
 
 | 
94
 | 
    | 
| 
4
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
5
 | 
5
 | 
 
 | 
 
 | 
  
5
  
 | 
 
 | 
1570
 | 
 use RxPerl::Subscription;  | 
| 
 
 | 
5
 | 
 
 | 
 
 | 
 
 | 
 
 | 
10
 | 
    | 
| 
 
 | 
5
 | 
 
 | 
 
 | 
 
 | 
 
 | 
131
 | 
    | 
| 
6
 | 
5
 | 
 
 | 
 
 | 
  
5
  
 | 
 
 | 
1528
 | 
 use RxPerl::Subscriber;  | 
| 
 
 | 
5
 | 
 
 | 
 
 | 
 
 | 
 
 | 
11
 | 
    | 
| 
 
 | 
5
 | 
 
 | 
 
 | 
 
 | 
 
 | 
120
 | 
    | 
| 
7
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
8
 | 
5
 | 
 
 | 
 
 | 
  
5
  
 | 
 
 | 
26
 | 
 use Scalar::Util 'reftype';  | 
| 
 
 | 
5
 | 
 
 | 
 
 | 
 
 | 
 
 | 
7
 | 
    | 
| 
 
 | 
5
 | 
 
 | 
 
 | 
 
 | 
 
 | 
173
 | 
    | 
| 
9
 | 
5
 | 
 
 | 
 
 | 
  
5
  
 | 
 
 | 
25
 | 
 use Carp 'croak';  | 
| 
 
 | 
5
 | 
 
 | 
 
 | 
 
 | 
 
 | 
5
 | 
    | 
| 
 
 | 
5
 | 
 
 | 
 
 | 
 
 | 
 
 | 
1565
 | 
    | 
| 
10
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
11
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # an observable is something you can subscribe to.  | 
| 
12
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
13
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # The class RxPerl::Observable has a method 'new'  | 
| 
14
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #   (arguments) This method accepts a function as an argument.  | 
| 
15
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #                   This function:  | 
| 
16
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #                       - accepts a subscriber as its only argument  | 
| 
17
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #                       - calls $subscriber->next,error,complete at its appropriate moments  | 
| 
18
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #                       - returns a subref, which contains the cleanup required, when the subscriber wishes to unsubscribe  | 
| 
19
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #   (return)    This method returns an instance of the RxPerl::Observable  | 
| 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #                   This RxPerl::Observable instance contains:  | 
| 
21
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #                       - the function  | 
| 
22
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
23
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # Objects of the RxPerl::Observable class have a 'subscribe' method  | 
| 
24
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #   (arguments) This method accepts zero to three arguments, which should be converted by the subscribe method to a clean hashref ('the subscriber') with the corresponding 0-3 keys  | 
| 
25
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #   (body)      This method calls the $function that RxPerl::Observable->new received as argument (and that initiates the subscription)  | 
| 
26
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 #   (return)    This method returns a new RxPerl::Subscription object, that contains the "cleanup subref" returned by $function  | 
| 
27
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
28
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 our $VERSION = "v6.27.1";  | 
| 
29
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
30
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub new {  | 
| 
31
 | 
2120
 | 
 
 | 
 
 | 
  
2120
  
 | 
  
0
  
 | 
2948
 | 
     my ($class, $function) = @_;  | 
| 
32
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
33
 | 
2120
 | 
 
 | 
 
 | 
 
 | 
 
 | 
3194
 | 
     my $self = {function => $function};  | 
| 
34
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
35
 | 
2120
 | 
 
 | 
 
 | 
 
 | 
 
 | 
5574
 | 
     bless $self, $class;  | 
| 
36
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
37
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
38
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub subscribe {  | 
| 
39
 | 
2243
 | 
 
 | 
 
 | 
  
2243
  
 | 
  
0
  
 | 
4350
 | 
     my ($self, @args) = @_;  | 
| 
40
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
41
 | 
2243
 | 
 
 | 
 
 | 
 
 | 
 
 | 
2640
 | 
     my $subscriber = {};  | 
| 
42
 | 
2243
 | 
 
 | 
 
 | 
 
 | 
 
 | 
2794
 | 
     bless $subscriber, 'RxPerl::Subscriber';  | 
| 
43
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
44
 | 
2243
 | 
  
100
  
 | 
  
 50
  
 | 
 
 | 
 
 | 
5812
 | 
     if ((reftype($args[0]) // '') eq 'HASH') {  | 
| 
45
 | 
2194
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
4180
 | 
         $args[0]{_subscription} = delete $args[0]{new_subscription} if $args[0]{new_subscription};  | 
| 
46
 | 
2194
 | 
 
 | 
 
 | 
 
 | 
 
 | 
2732
 | 
         @$subscriber{qw/ next error complete _subscription /} = @{ $args[0] }{qw/ next error complete _subscription /};  | 
| 
 
 | 
2194
 | 
 
 | 
 
 | 
 
 | 
 
 | 
5017
 | 
    | 
| 
47
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     } else {  | 
| 
48
 | 
49
 | 
 
 | 
 
 | 
 
 | 
 
 | 
111
 | 
         @$subscriber{qw/ next error complete /} = @args;  | 
| 
49
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     }  | 
| 
50
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
51
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     $subscriber->{error} //= sub {  | 
| 
52
 | 
  
0
  
 | 
 
 | 
 
 | 
  
0
  
 | 
 
 | 
0
 | 
         my ($err) = @_;  | 
| 
53
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
54
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         # TODO: shouldn't croak immediately, to be like rxjs, but on the next tick  | 
| 
55
 | 
0
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
         croak $err;  | 
| 
56
 | 
2243
 | 
 
 | 
  
100
  
 | 
 
 | 
 
 | 
3939
 | 
     };  | 
| 
57
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
58
 | 
2243
 | 
 
 | 
  
 66
  
 | 
 
 | 
 
 | 
4711
 | 
     my $subscription = $subscriber->{_subscription} //= RxPerl::Subscription->new;  | 
| 
59
 | 
2243
 | 
 
 | 
 
 | 
 
 | 
 
 | 
3092
 | 
     $subscriber->{closed_ref} = \$subscription->{closed};  | 
| 
60
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
61
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     # don't continue if the subscription has already closed (complete/error)  | 
| 
62
 | 
2243
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
3418
 | 
     return $subscription if $subscription->{closed};  | 
| 
63
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
64
 | 
2243
 | 
 
 | 
 
 | 
 
 | 
 
 | 
4725
 | 
     $subscription->_add_to_subscribers($subscriber);  | 
| 
65
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
66
 | 
2243
 | 
 
 | 
 
 | 
 
 | 
 
 | 
2864
 | 
     my $fn = $self->{function};  | 
| 
67
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
68
 | 
2243
 | 
 
 | 
 
 | 
 
 | 
 
 | 
3990
 | 
     my @cbs = $fn->($subscriber);  | 
| 
69
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
70
 | 
2243
 | 
 
 | 
 
 | 
 
 | 
 
 | 
4764
 | 
     $subscription->add(@cbs);  | 
| 
71
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
72
 | 
2243
 | 
 
 | 
 
 | 
 
 | 
 
 | 
9272
 | 
     return $subscription;  | 
| 
73
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
74
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
75
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub pipe {  | 
| 
76
 | 
908
 | 
 
 | 
 
 | 
  
908
  
 | 
  
0
  
 | 
1407
 | 
     my ($self, @operators) = @_;  | 
| 
77
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
78
 | 
908
 | 
 
 | 
 
 | 
 
 | 
 
 | 
960
 | 
     my $result = $self;  | 
| 
79
 | 
908
 | 
 
 | 
 
 | 
 
 | 
 
 | 
1877
 | 
     $result = $_->($result) foreach @operators;  | 
| 
80
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
81
 | 
908
 | 
 
 | 
 
 | 
 
 | 
 
 | 
2385
 | 
     return $result;  | 
| 
82
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
83
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
84
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 1;  |