File Coverage

blib/lib/RxPerl/Observable.pm
Criterion Covered Total %
statement 42 44 95.4
branch 5 6 83.3
condition 5 8 62.5
subroutine 9 10 90.0
pod 0 3 0.0
total 61 71 85.9


line stmt bran cond sub pod time code
1             package RxPerl::Observable;
2 5     5   48 use strict;
  5         8  
  5         188  
3 5     5   24 use warnings;
  5         21  
  5         327  
4              
5 5     5   2637 use RxPerl::Subscription;
  5         16  
  5         199  
6 5     5   2465 use RxPerl::Subscriber;
  5         21  
  5         203  
7              
8 5     5   87 use Scalar::Util 'reftype';
  5         22  
  5         315  
9 5     5   31 use Carp 'croak';
  5         8  
  5         2862  
10              
11             # an observable is something you can subscribe to.
12              
13             # The class RxPerl::Observable has a method 'new'
14             # (arguments) This method accepts a function as an argument.
15             # This function:
16             # - accepts a subscriber as its only argument
17             # - calls $subscriber->next,error,complete at its appropriate moments
18             # - returns a subref, which contains the cleanup required, when the subscriber wishes to unsubscribe
19             # (return) This method returns an instance of the RxPerl::Observable
20             # This RxPerl::Observable instance contains:
21             # - the function
22              
23             # Objects of the RxPerl::Observable class have a 'subscribe' method
24             # (arguments) This method accepts zero to three arguments, which should be converted by the subscribe method to a clean hashref ('the subscriber') with the corresponding 0-3 keys
25             # (body) This method calls the $function that RxPerl::Observable->new received as argument (and that initiates the subscription)
26             # (return) This method returns a new RxPerl::Subscription object, that contains the "cleanup subref" returned by $function
27              
28             our $VERSION = "v6.29.8";
29              
30             sub new {
31 2261     2261 0 330564 my ($class, $function) = @_;
32              
33 2261         3797 my $self = {function => $function};
34              
35 2261         7267 bless $self, $class;
36             }
37              
38             sub subscribe {
39 2390     2390 0 5755 my ($self, @args) = @_;
40              
41 2390         3076 my $subscriber = {};
42 2390         3785 bless $subscriber, 'RxPerl::Subscriber';
43              
44 2390 100 50     6459 if ((reftype($args[0]) // '') eq 'HASH') {
45 2336 100       5122 $args[0]{_subscription} = delete $args[0]{new_subscription} if $args[0]{new_subscription};
46 2336         3195 @$subscriber{qw/ next error complete _subscription /} = @{ $args[0] }{qw/ next error complete _subscription /};
  2336         7017  
47             } else {
48 54         181 @$subscriber{qw/ next error complete /} = @args;
49             }
50              
51             $subscriber->{error} //= sub {
52 0     0   0 my ($err) = @_;
53              
54             # TODO: shouldn't croak immediately, to be like rxjs, but on the next tick. Also is bad because
55             # TODO: it won't call the @cbs
56 0         0 croak $err;
57 2390   66     4794 };
58              
59 2390   66     5779 my $subscription = $subscriber->{_subscription} //= RxPerl::Subscription->new;
60 2390         4032 $subscriber->{closed_ref} = \$subscription->{closed};
61              
62             # don't continue if the subscription has already closed (complete/error)
63 2390 50       4216 return $subscription if $subscription->{closed};
64              
65 2390         5642 $subscription->_add_to_subscribers($subscriber);
66              
67 2390         3431 my $fn = $self->{function};
68              
69 2390         5482 my @cbs = $fn->($subscriber);
70              
71 2390         5961 $subscription->add(@cbs);
72              
73 2390         13409 return $subscription;
74             }
75              
76             sub pipe {
77 960     960 0 1693 my ($self, @operators) = @_;
78              
79 960         1264 my $result = $self;
80 960         2086 $result = $_->($result) foreach @operators;
81              
82 960         3487 return $result;
83             }
84              
85             1;