File Coverage

blib/lib/Mojo/Rx/Observable.pm
Criterion Covered Total %
statement 33 38 86.8
branch 4 6 66.6
condition 3 5 60.0
subroutine 7 8 87.5
pod 0 3 0.0
total 47 60 78.3


line stmt bran cond sub pod time code
1             package Mojo::Rx::Observable;
2 2     2   15 use strict;
  2         4  
  2         65  
3 2     2   10 use warnings FATAL => 'all';
  2         4  
  2         66  
4              
5 2     2   862 use Mojo::Rx::Subscription;
  2         4  
  2         63  
6 2     2   791 use Mojo::Rx::Subscriber;
  2         6  
  2         64  
7              
8 2     2   12 use Scalar::Util 'reftype';
  2         4  
  2         765  
9              
10             # an observable is something you can subscribe to.
11              
12             # The class Mojo::Rx::Observable has a method 'new'
13             # (arguments) This method accepts a function as an argument.
14             # This function:
15             # - accepts a subscriber as its only argument
16             # - calls $subscriber->next,error,complete at its appropriate moments
17             # - returns a subref, which contains the cleanup required, when the subscriber wishes to unsubscribe
18             # (return) This method returns an instance of the Mojo::Rx::Observable
19             # This Mojo::Rx::Observable instance contains:
20             # - the function
21              
22             # Objects of the Mojo::Rx::Observable class have a 'subscribe' method
23             # (arguments) This method accepts zero to three arguments, which should be converted by the subscribe method to a clean hashref ('the subscriber') with the corresponding 0-3 keys
24             # (body) This method calls the $function that Mojo::Rx::Observable->new received as argument (and that initiates the subscription)
25             # (return) This method returns a new Mojo::Rx::Subscription object, that contains the "cleanup subref" returned by $function
26              
27             our $VERSION = "v0.13.0";
28              
29             sub new {
30 6     6 0 17 my ($class, $function) = @_;
31              
32 6         13 my $self = {function => $function};
33              
34 6         34 bless $self, $class;
35             }
36              
37             sub subscribe {
38 6     6 0 40 my ($self, @args) = @_;
39              
40 6         13 my $subscriber = {};
41 6         13 bless $subscriber, 'Mojo::Rx::Subscriber';
42              
43 6 50 50     32 if ((reftype($args[0]) // '') eq 'HASH') {
44 6 100       18 $args[0]{_subscription} = delete $args[0]{new_subscription} if $args[0]{new_subscription};
45 6         10 @$subscriber{qw/ next error complete _subscription /} = @{ $args[0] }{qw/ next error complete _subscription /};
  6         24  
46             } else {
47 0         0 @$subscriber{qw/ next error complete /} = @args;
48             }
49              
50 6   66     29 my $subscription = $subscriber->{_subscription} //= Mojo::Rx::Subscription->new;
51 6         13 $subscriber->{closed_ref} = \$subscription->{closed};
52              
53             # don't continue if the subscription has already closed (complete/error)
54 6 50       15 return $subscription if $subscription->{closed};
55              
56 6         17 $subscription->add_to_subscribers($subscriber);
57              
58 6         13 my $fn = $self->{function};
59              
60 6         15 my @cbs = $fn->($subscriber);
61              
62 6         34 $subscription->add_dependents(@cbs);
63              
64 6         92 return $subscription;
65             }
66              
67             sub pipe {
68 0     0 0   my ($self, @operators) = @_;
69              
70 0           my $result = $self;
71 0           $result = $_->($result) foreach @operators;
72              
73 0           return $result;
74             }
75              
76             1;