File Coverage

blib/lib/RxPerl/Functions.pm
Criterion Covered Total %
statement 45 50 90.0
branch 5 8 62.5
condition 1 3 33.3
subroutine 13 15 86.6
pod n/a
total 64 76 84.2


line stmt bran cond sub pod time code
1             package RxPerl::Functions;
2              
3 5     5   36 use strict;
  5         10  
  5         173  
4 5     5   26 use warnings;
  5         13  
  5         211  
5              
6             require RxPerl::Operators::Pipeable;
7              
8 5     5   27 use Carp 'croak';
  5         23  
  5         220  
9 5     5   30 use Scalar::Util 'blessed';
  5         10  
  5         221  
10              
11 5     5   31 use Exporter 'import';
  5         9  
  5         707  
12             our @EXPORT_OK = qw/
13             last_value_from first_value_from is_observable
14             /;
15             our %EXPORT_TAGS = (all => \@EXPORT_OK);
16              
17             our $VERSION = "v6.27.0";
18              
19             sub _promise_class {
20 4     4   30 my $fn = (caller(1))[3];
21 4         19 my $rx_class = $fn;
22 4         29 $rx_class =~ s/\:\:[^\:]+\z//;
23 5     5   35 no strict 'refs';
  5         21  
  5         2395  
24 4         9 my $promise_class = ${ "${rx_class}::promise_class" };
  4         16  
25 4 50       17 return wantarray ? ($promise_class, $rx_class) : $promise_class;
26             }
27              
28             sub last_value_from {
29 4     4   7820 my ($observable) = @_;
30              
31 4         11 my ($promise_class, $rx_class) = _promise_class;
32 4 50       12 $promise_class or croak "Promise class not set, set it with: ${rx_class}->set_promise_class(\$promise_class)";
33              
34 4         5 my ($promise, $resolve, $reject) = do {
35 4 50       10 if ($promise_class eq 'Future') {
36 0         0 my $future = Future->new;
37 0     0   0 ( $future, sub { $future->done(@_) }, sub { $future->fail(@_) } );
  0         0  
  0         0  
38             } else {
39 4         8 my ($res, $rej);
40             my $p = $promise_class->new(sub {
41 4     4   103 ($res, $rej) = @_;
42 4         25 });
43 4         33 ( $p, $res, $rej );
44             }
45             };
46              
47 4         7 my ($got_value, $last_value);
48             $observable->subscribe({
49             next => sub {
50 4     4   5 $last_value = $_[0];
51 4         10 $got_value = 1;
52             },
53             error => sub {
54 0     0   0 $reject->($_[0]);
55             },
56             complete => sub {
57 4 100   4   7 if ($got_value) {
58 2         7 $resolve->($last_value);
59             } else {
60 2         6 $reject->('no elements in sequence');
61             }
62             },
63 4         35 });
64              
65 4         70 return $promise;
66             }
67              
68             sub first_value_from {
69 2     2   17 my ($observable) = @_;
70 2         7 return last_value_from(
71             $observable->pipe(RxPerl::Operators::Pipeable::op_first())
72             );
73             }
74              
75             sub is_observable {
76 1     1   18 my ($thing) = @_;
77              
78 1   33     21 return !!(blessed($thing) && $thing->isa('RxPerl::Observable'));
79             }
80              
81             1;