File Coverage

blib/lib/At/Protocol/Firehose.pm
Criterion Covered Total %
statement 32 38 84.2
branch 3 10 30.0
condition n/a
subroutine 6 6 100.0
pod n/a
total 41 54 75.9


line stmt bran cond sub pod time code
1 1     1   746 use v5.42;
  1         4  
2 1     1   9 use feature 'class';
  1         3  
  1         193  
3 1     1   9 use experimental 'try';
  1         2  
  1         9  
4 1     1   110 no warnings 'experimental::class';
  1         3  
  1         121  
5              
6             class At::Protocol::Firehose 1.0 {
7 1     1   8 use At::Error;
  1         2  
  1         8  
8             field $at : param;
9             field $url : param : reader //= 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos';
10             field $callback : param;
11             ADJUST {
12             try {
13             require CBOR::Free;
14              
15             # Ensure we have some way to decode sequences
16             my $has_seq = CBOR::Free->can('decode_sequence') || do {
17             my $ok = 0;
18             try { require CBOR::Free::SequenceDecoder; $ok = 1; }
19             catch ($e) { }
20             $ok;
21             };
22             die "CBOR::Free is too old (SequenceDecoder missing)" unless $has_seq;
23             }
24             catch ($e) {
25             die "CBOR::Free (with SequenceDecoder support) is required for the firehose. $e";
26             }
27             }
28              
29             method start() {
30 1         4 $at->http->websocket(
31 1     1   484 $url => sub ( $msg, $err ) {
  1         1  
  1         3  
32 1 50       4 if ($err) {
33 0         0 $callback->( undef, undef, $err );
34 0         0 return;
35             }
36 1         3 try {
37 1         3 my @objects;
38             {
39 1         3 local $SIG{__WARN__} = sub {
40 0 0       0 return if $_[0] =~ /Ignoring unrecognized CBOR tag #42/;
41 0         0 warn $_[0];
42 1         11 };
43              
44             # Try functional interface first (CBOR::Free 0.32+)
45 1         3 try {
46 1         20 @objects = CBOR::Free::decode_sequence($msg);
47             }
48             catch ($e) {
49              
50             # Fallback to SequenceDecoder
51 1         10 my $decoder = CBOR::Free::SequenceDecoder->new();
52 1 50       49 if ( my $sr = $decoder->give($msg) ) {
53 1         4 push @objects, $$sr;
54             }
55 1         9 while ( my $sr = $decoder->get() ) {
56 1         17 push @objects, $$sr;
57             }
58             }
59             }
60 1 50       6 if ( @objects >= 2 ) {
    0          
61 1         7 $callback->( $objects[0], $objects[1], undef );
62             }
63             elsif ( @objects == 1 ) {
64 0           $callback->( $objects[0], undef, At::Error->new( message => 'Incomplete firehose message', fatal => 0 ) );
65             }
66             }
67             catch ($e) {
68 0           $callback->( undef, undef, At::Error->new( message => "Firehose decode failed: $e", fatal => 0 ) );
69             }
70             }
71             );
72             }
73             }
74             1;
75             __END__