File Coverage

lib/At.pm
Criterion Covered Total %
statement 59 78 75.6
branch 0 4 0.0
condition n/a
subroutine 20 24 83.3
pod n/a
total 79 106 74.5


line stmt bran cond sub pod time code
1 3     3   612536 use v5.42;
  3         11  
2 3     3   22 use feature 'class';
  3         8  
  3         676  
3 3     3   20 no warnings 'experimental::class', 'experimental::builtin', 'experimental::for_list'; # Be quiet.
  3         5  
  3         397  
4              
5             #~ |---------------------------------------|
6             #~ |------3-33-----------------------------|
7             #~ |-5-55------4-44-5-55----353--3-33-/1~--|
8             #~ |---------------------335---33----------|
9             class At 1.5 {
10 3     3   21 use Carp qw[];
  3         8  
  3         93  
11 3     3   1711 use experimental 'try';
  3         13919  
  3         17  
12 3     3   2051 use File::ShareDir::Tiny qw[dist_dir];
  3         6933  
  3         243  
13 3     3   1764 use JSON::PP qw[decode_json encode_json];
  3         45536  
  3         284  
14 3     3   2047 use Path::Tiny qw[path];
  3         34484  
  3         256  
15 3     3   1741 use Digest::SHA qw[sha256];
  3         9773  
  3         320  
16 3     3   1490 use MIME::Base64 qw[encode_base64url];
  3         2166  
  3         200  
17 3     3   1802 use Crypt::PK::ECC;
  3         84198  
  3         260  
18 3     3   26 use Crypt::PRNG qw[random_string];
  3         6  
  3         168  
19 3     3   1690 use Time::Moment; # Internal; standardize around Zulu
  3         5312  
  3         138  
20 3     3   2079 use URI;
  3         20576  
  3         127  
21 3     3   39 use warnings::register;
  3         12  
  3         188  
22 3     3   1898 use At::Error;
  3         10  
  3         12  
23 3     3   1824 use At::Protocol::URI;
  3         11  
  3         222  
24 3     3   1607 use At::Protocol::Session;
  3         10  
  3         153  
25 3     3   1719 use At::UserAgent;
  3         13  
  3         10434  
26             field $share : reader : param = ();
27             field %lexicons : reader;
28             field $http : reader : param = ();
29             field $lexicon_paths_param : param(lexicon_paths) = [];
30             field @lexicon_paths;
31             field $host : param : reader //= 'bsky.social';
32             field $ipfs_node : param : reader //= undef;
33             field $bitswap;
34             method set_host ($new) { $host = $new }
35             field $session = ();
36             field $oauth_state;
37             field $dpop_key;
38             field %ratelimits : reader = ( # https://docs.bsky.app/docs/advanced-guides/rate-limits
39             global => {},
40             updateHandle => {}, # per DID
41             createSession => {}, # per handle
42             deleteAccount => {}, # by IP
43             resetPassword => {} # by IP
44             );
45             ADJUST {
46             if ($ipfs_node) {
47             require InterPlanetary::Protocol::Bitswap;
48             $bitswap = InterPlanetary::Protocol::Bitswap->new( node => $ipfs_node );
49             }
50             if ( !defined $share ) {
51             try { $share = dist_dir('At') }
52             catch ($e) { $share = 'share' }
53             }
54             $share = path($share) unless builtin::blessed $share;
55             @lexicon_paths = map { path($_) } ( ref $lexicon_paths_param eq 'ARRAY' ? @$lexicon_paths_param : ($lexicon_paths_param) );
56             if ( !defined $http ) {
57             my $ua_class;
58             try {
59             require Mojo::UserAgent;
60             $ua_class = 'At::UserAgent::Mojo';
61             }
62             catch ($e) {
63             $ua_class = 'At::UserAgent::Tiny';
64             }
65             $http = $ua_class->new();
66             }
67             $host = 'https://' . $host unless $host =~ /^https?:/;
68             $host = URI->new($host) unless builtin::blessed $host;
69             }
70              
71             # OAuth Implementation
72             method _get_dpop_key() {
73             unless ($dpop_key) {
74             $dpop_key = Crypt::PK::ECC->new();
75             $dpop_key->generate_key('secp256r1');
76             }
77             return $dpop_key;
78             }
79              
80             method oauth_discover ($handle) {
81             my $res = $self->resolve_handle($handle);
82             if ( builtin::blessed($res) && $res->isa('At::Error') ) { $res->throw; }
83             return unless $res && $res->{did};
84             my $pds = $self->pds_for_did( $res->{did} );
85             unless ($pds) { die 'Could not resolve PDS for DID: ' . $res->{did}; }
86             my ($protected) = $http->get( $pds . '/.well-known/oauth-protected-resource' );
87             if ( builtin::blessed($protected) && $protected->isa('At::Error') ) { $protected->throw; }
88             return unless $protected && $protected->{authorization_servers};
89             my $auth_server = $protected->{authorization_servers}[0];
90             my ($metadata) = $http->get( $auth_server . '/.well-known/oauth-authorization-server' );
91             if ( builtin::blessed($metadata) && $metadata->isa('At::Error') ) { $metadata->throw; }
92             return { pds => $pds, auth_server => $auth_server, metadata => $metadata, did => $res->{did} };
93             }
94              
95             method oauth_start ( $handle, $client_id, $redirect_uri, $scope = 'atproto' ) {
96             my $discovery = $self->oauth_discover($handle);
97             die 'Failed to discover OAuth metadata for ' . $handle unless $discovery;
98             my $chars = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-._~';
99             my $code_verifier = Crypt::PRNG::random_string_from( $chars, 43 );
100             my $code_challenge = encode_base64url( sha256($code_verifier) );
101             $code_challenge =~ s/=+$//;
102             my $state = Crypt::PRNG::random_string_from( $chars, 16 );
103             $oauth_state = {
104             discovery => $discovery,
105             code_verifier => $code_verifier,
106             state => $state,
107             redirect_uri => $redirect_uri,
108             client_id => $client_id,
109             handle => $handle,
110             scope => $scope
111             };
112              
113             # Prepare UA for DPoP
114             $http->set_tokens( undef, undef, 'DPoP', $self->_get_dpop_key() );
115             my $par_endpoint = $discovery->{metadata}{pushed_authorization_request_endpoint};
116             my $par_content = {
117             client_id => $client_id,
118             response_type => 'code',
119             code_challenge => $code_challenge,
120             code_challenge_method => 'S256',
121             redirect_uri => $redirect_uri,
122             state => $state,
123             scope => $scope,
124             aud => $discovery->{pds},
125             };
126             say '[DEBUG] [At] PAR request: ' . JSON::PP->new->ascii->encode($par_content) if $ENV{DEBUG};
127             my ($par_res) = $http->post(
128             $par_endpoint => {
129             headers => { DPoP => $http->_generate_dpop_proof( $par_endpoint, 'POST', 1 ) },
130             encoding => 'form',
131             content => $par_content,
132             skip_ath => 1
133             }
134             );
135             die 'PAR failed: ' . ( $par_res . '' ) if builtin::blessed $par_res;
136             say '[DEBUG] [At] PAR response: ' . JSON::PP->new->ascii->encode($par_res) if $ENV{DEBUG};
137             my $auth_uri = URI->new( $discovery->{metadata}{authorization_endpoint} );
138             $auth_uri->query_form( client_id => $client_id, request_uri => $par_res->{request_uri} );
139             return $auth_uri->as_string;
140             }
141              
142             method oauth_callback ( $code, $state ) {
143             die 'OAuth state mismatch' unless $oauth_state && $state eq $oauth_state->{state};
144             my $token_endpoint = $oauth_state->{discovery}{metadata}{token_endpoint};
145             my $key = $self->_get_dpop_key();
146             my ($token_res) = $http->post(
147             $token_endpoint => {
148             headers => { DPoP => $http->_generate_dpop_proof( $token_endpoint, 'POST', 1 ) },
149             encoding => 'form',
150             content => {
151             grant_type => 'authorization_code',
152             code => $code,
153             client_id => $oauth_state->{client_id},
154             redirect_uri => $oauth_state->{redirect_uri},
155             code_verifier => $oauth_state->{code_verifier},
156             aud => $oauth_state->{discovery}{pds}
157             },
158             skip_ath => 1
159             }
160             );
161             die 'Token exchange failed: ' . ( $token_res . '' ) if builtin::blessed $token_res;
162             say '[DEBUG] [At] Token response: ' . JSON::PP->new->ascii->encode($token_res) if $ENV{DEBUG};
163             $session = At::Protocol::Session->new(
164             did => $token_res->{sub},
165             accessJwt => $token_res->{access_token},
166             refreshJwt => $token_res->{refresh_token},
167             handle => $oauth_state->{handle},
168             token_type => 'DPoP',
169             dpop_key_jwk => $key->export_key_jwk('private'),
170             client_id => $oauth_state->{client_id},
171             scope => $token_res->{scope},
172             pds => $oauth_state->{discovery}{pds}
173             );
174             $self->set_host( $oauth_state->{discovery}{pds} );
175             $http->set_tokens( $token_res->{access_token}, $token_res->{refresh_token}, 'DPoP', $key );
176             }
177              
178             method oauth_refresh() {
179             return unless $session && $session->refreshJwt && $session->token_type eq 'DPoP';
180             my $discovery = $self->oauth_discover( $session->handle );
181             return unless $discovery;
182             my $token_endpoint = $discovery->{metadata}{token_endpoint};
183             my $key = $self->_get_dpop_key();
184             my $refresh_content = {
185             grant_type => 'refresh_token',
186             refresh_token => $session->refreshJwt,
187             client_id => $session->client_id // '',
188             aud => $discovery->{pds},
189             };
190             say '[DEBUG] [At] Refresh request: ' . JSON::PP->new->ascii->encode($refresh_content) if $ENV{DEBUG};
191             my ($token_res) = $http->post(
192             $token_endpoint => {
193             headers => { DPoP => $http->_generate_dpop_proof( $token_endpoint, 'POST', 1 ) },
194             encoding => 'form',
195             content => $refresh_content,
196             skip_ath => 1
197             }
198             );
199             die 'Refresh failed: ' . ( $token_res . '' ) if builtin::blessed $token_res;
200             $session = At::Protocol::Session->new(
201             did => $token_res->{sub},
202             accessJwt => $token_res->{access_token},
203             refreshJwt => $token_res->{refresh_token},
204             handle => $session->handle,
205             token_type => 'DPoP',
206             dpop_key_jwk => $key->export_key_jwk('private'),
207             client_id => $session->client_id,
208             scope => $token_res->{scope},
209             pds => $discovery->{pds}
210             );
211             $self->set_host( $discovery->{pds} );
212             return $http->set_tokens( $token_res->{access_token}, $token_res->{refresh_token}, 'DPoP', $key );
213             }
214              
215             method collection_scope ( $collection, $action = 'create' ) {
216             return "repo:$collection?action=$action";
217             }
218              
219             # Legacy Auth
220             method login( $identifier, $password ) {
221             warnings::warnif( At => 'login() (com.atproto.server.createSession) is deprecated. Please use OAuth instead.' );
222             my $res = $self->post( 'com.atproto.server.createSession' => { identifier => $identifier, password => $password } );
223             if ( $res && !builtin::blessed($res) ) { $session = At::Protocol::Session->new(%$res); }
224             else { $session = $res; }
225             return $session ? $http->set_tokens( $session->accessJwt, $session->refreshJwt, undef, undef ) : $session;
226             }
227              
228             method resume ( $accessJwt, $refreshJwt, $token_type = 'Bearer', $dpop_key_jwk = (), $client_id = (), $handle = (), $pds = (), $scope = () ) {
229             my $access = $self->_decode_token($accessJwt);
230             my $refresh = $self->_decode_token($refreshJwt);
231             return unless $access;
232             my $key;
233             if ( $token_type eq 'DPoP' && $dpop_key_jwk ) {
234             $key = Crypt::PK::ECC->new();
235             $key->import_key( \$dpop_key_jwk );
236             $dpop_key = $key;
237             }
238             if ( $refresh && time > $access->{exp} && time < $refresh->{exp} ) {
239             if ( $token_type eq 'DPoP' ) { return $self->oauth_refresh(); }
240             else {
241             my $res = $self->post( 'com.atproto.server.refreshSession' => { refreshJwt => $refreshJwt } );
242             if ( $res && !builtin::blessed($res) ) { $session = At::Protocol::Session->new(%$res); }
243             else { $session = $res; }
244             return $session ? $http->set_tokens( $session->accessJwt, $session->refreshJwt, $token_type, $key ) : $session;
245             }
246             }
247             $session = At::Protocol::Session->new(
248             did => $access->{sub},
249             accessJwt => $accessJwt,
250             refreshJwt => $refreshJwt,
251             token_type => $token_type,
252             dpop_key_jwk => $dpop_key_jwk,
253             client_id => $client_id,
254             handle => $handle,
255             pds => $pds,
256             scope => $scope
257             );
258             $self->set_host($pds) if $pds;
259             return $http->set_tokens( $accessJwt, $refreshJwt, $token_type, $key );
260             }
261              
262             method _decode_token ($token) {
263             return unless defined $token;
264 3     3   32 use MIME::Base64 qw[decode_base64];
  3         5  
  3         18167  
265             my ( $header, $payload, $sig ) = split /\./, $token;
266             return unless defined $payload;
267             $payload =~ tr[-_][+/];
268             try {
269             return decode_json decode_base64 $payload;
270             }
271             catch ($e) {
272             return;
273             }
274             }
275              
276             # XRPC & Lexicons
277             method _locate_lexicon($fqdn) {
278             unless ( defined $lexicons{$fqdn} ) {
279             my $base_fqdn = $fqdn =~ s[#(.+)$][]r;
280             my @namespace = split /\./, $base_fqdn;
281             my @search = (
282             @lexicon_paths,
283             $share->child('lexicons'),
284             defined $ENV{HOME} ? path( $ENV{HOME}, '.cache', 'atproto', 'lexicons' ) : (),
285             path( 'share', 'lexicons' )
286             );
287             my $lex_file;
288             for my $dir (@search) {
289             next unless defined $dir;
290             my $possible = $dir->child( @namespace[ 0 .. $#namespace - 1 ], $namespace[-1] . '.json' );
291             if ( $possible->exists ) { $lex_file = $possible; last; }
292             }
293             if ( !$lex_file ) { $lex_file = $self->_fetch_lexicon($base_fqdn); }
294             if ( $lex_file && $lex_file->exists ) {
295             my $json = decode_json $lex_file->slurp_raw;
296             for my $def ( keys %{ $json->{defs} } ) {
297             $lexicons{ $base_fqdn . ( $def eq 'main' ? '' : '#' . $def ) } = $json->{defs}{$def};
298             $lexicons{ $base_fqdn . '#main' } = $json->{defs}{$def} if $def eq 'main';
299             }
300             }
301             }
302             $lexicons{$fqdn};
303             }
304              
305             method _fetch_lexicon($base_fqdn) {
306             my @namespace = split /\./, $base_fqdn;
307             my $rel_path = join( '/', @namespace[ 0 .. $#namespace - 1 ], $namespace[-1] . '.json' );
308             my $url = 'https://raw.githubusercontent.com/bluesky-social/atproto/main/lexicons/' . $rel_path;
309             my ( $content, $headers ) = $http->get($url);
310             if ( $content && !builtin::blessed($content) ) {
311             my $cache_dir = defined $ENV{HOME} ? path( $ENV{HOME}, '.cache', 'atproto', 'lexicons' ) : path( '.cache', 'atproto', 'lexicons' );
312             $cache_dir->mkpath;
313             my $lex_file = $cache_dir->child( @namespace[ 0 .. $#namespace - 1 ], $namespace[-1] . '.json' );
314             $lex_file->parent->mkpath;
315             $lex_file->spew_raw( builtin::blessed($content) ? encode_json($content) : $content );
316             return $lex_file;
317             }
318             return;
319             }
320              
321             method get( $fqdn, $args = (), $headers = {} ) {
322             my $lexicon = $self->_locate_lexicon($fqdn);
323             my $category = $fqdn =~ /^com\.atproto\.repo\./ ? 'repo' : 'global';
324             my $meta = $category eq 'repo' ? ( $args->{repo} // $self->did ) : ();
325             $self->_ratecheck( $category, $meta );
326             my ( $content, $res_headers )
327             = $http->get( sprintf( '%s/xrpc/%s', $host, $fqdn ), { defined $args ? ( content => $args ) : (), headers => $headers } );
328             $self->ratelimit_( $res_headers, $category, $meta );
329             if ( $lexicon && !builtin::blessed $content ) {
330             $content = $self->_coerce( $fqdn, $lexicon->{output}{schema}, $content );
331             }
332             wantarray ? ( $content, $res_headers ) : $content;
333             }
334              
335             method post( $fqdn, $args = (), $headers = {} ) {
336             my @namespace = split /\./, $fqdn;
337             my $lexicon = $self->_locate_lexicon($fqdn);
338              
339             # Categorize according to bsky specs
340             my $category = 'global';
341             my $meta = ();
342             if ( $fqdn =~ /^com\.atproto\.server\.createSession$/ ) {
343             $category = 'auth';
344             $meta = $args->{identifier};
345             }
346             elsif ( $fqdn =~ /^com\.atproto\.repo\./ ) {
347             $category = 'repo';
348             $meta = $args->{repo} // $self->did;
349             }
350             elsif ( $namespace[-1] =~ m[^(updateHandle|createAccount|deleteAccount|resetPassword)$] ) {
351             $category = $namespace[-1];
352             $meta = $args->{did} // $args->{handle} // $args->{email};
353             }
354             $self->_ratecheck( $category, $meta );
355             my ( $content, $res_headers )
356             = $http->post( sprintf( '%s/xrpc/%s', $host, $fqdn ), { defined $args ? ( content => $args ) : (), headers => $headers } );
357             $self->ratelimit_( $res_headers, $category, $meta );
358             if ( $lexicon && !builtin::blessed $content ) {
359             $content = $self->_coerce( $fqdn, $lexicon->{output}{schema}, $content );
360             }
361             return wantarray ? ( $content, $res_headers ) : $content;
362             }
363             method subscribe( $id, $cb ) { $self->http->websocket( sprintf( '%s/xrpc/%s', $host, $id ), $cb ); }
364              
365             method firehose ( $callback, $url = () ) {
366             require At::Protocol::Firehose;
367             return At::Protocol::Firehose->new( at => $self, callback => $callback, defined $url ? ( url => $url ) : () );
368             }
369              
370             # Coercion Logic
371             my %coercions = (
372             array => method( $namespace, $schema, $data ) {
373             [ map { $self->_coerce( $namespace, $schema->{items}, $_ ) } @$data ]
374             },
375             boolean => method( $namespace, $schema, $data ) { !!$data },
376             bytes => method( $namespace, $schema, $data ) {$data},
377             blob => method( $namespace, $schema, $data ) {$data},
378             integer => method( $namespace, $schema, $data ) { int $data },
379             object => method( $namespace, $schema, $data ) {
380             for my ( $name, $subschema )( %{ $schema->{properties} } ) {
381             $data->{$name} = $self->_coerce( $namespace, $subschema, $data->{$name} );
382             }
383             $data;
384             },
385             ref => method( $namespace, $schema, $data ) {
386             my $target_namespace = $self->_resolve_namespace( $namespace, $schema->{ref} );
387             my $lexicon = $self->_locate_lexicon($target_namespace);
388             return $data unless $lexicon;
389             $self->_coerce( $target_namespace, $lexicon, $data );
390             },
391             union => method( $namespace, $schema, $data ) {$data},
392             unknown => method( $namespace, $schema, $data ) {$data},
393             string => method( $namespace, $schema, $data ) {
394             $data // return ();
395             if ( defined $schema->{format} ) {
396             if ( $schema->{format} eq 'uri' ) { return URI->new($data); }
397             elsif ( $schema->{format} eq 'at-uri' ) { return At::Protocol::URI->new($data); }
398             elsif ( $schema->{format} eq 'datetime' ) {
399             return $data =~ /\D/ ? Time::Moment->from_string($data) : Time::Moment->from_epoch($data);
400             }
401             elsif ( $schema->{format} eq 'did' ) {
402             require At::Protocol::DID;
403             return At::Protocol::DID->new($data);
404             }
405             elsif ( $schema->{format} eq 'handle' ) {
406             require At::Protocol::Handle;
407             try { return At::Protocol::Handle->new($data); }
408             catch ($e) { return $data; }
409             }
410             }
411             $data;
412             }
413             );
414              
415             method _coerce ( $namespace, $schema, $data ) {
416             $data // return ();
417             return $coercions{ $schema->{type} }->( $self, $namespace, $schema, $data ) if defined $coercions{ $schema->{type} };
418             return $data;
419             }
420              
421             method _resolve_namespace ( $l, $r ) {
422             return $r if $r =~ m[.+#];
423             return $` . $r if $l =~ m[#.+];
424             $l . $r;
425             }
426              
427             # Identity & Helpers
428             method did() { $session ? $session->did . '' : undef; }
429             method resolve_handle($handle) { $self->get( 'com.atproto.identity.resolveHandle' => { handle => $handle } ); }
430              
431             method resolve_did_to_handle ($did) {
432             my $doc = $self->resolve_did($did);
433             return $doc->{alsoKnownAs}[0] =~ s/^at:\/\///r if $doc && $doc->{alsoKnownAs};
434             return;
435             }
436              
437             method upload_blob ( $data, $mime_type ) {
438             $self->post( 'com.atproto.repo.uploadBlob' => { content => $data, headers => { 'Content-Type' => $mime_type } } );
439             }
440              
441             method create_record ( $collection, $record, $rkey = undef ) {
442             $self->post( 'com.atproto.repo.createRecord' =>
443             { repo => $self->did, collection => $collection, record => $record, defined $rkey ? ( rkey => $rkey ) : () } );
444             }
445              
446             method delete_record ( $collection, $rkey ) {
447             $self->post( 'com.atproto.repo.deleteRecord' => { repo => $self->did, collection => $collection, rkey => $rkey } );
448             }
449              
450             method put_record ( $collection, $rkey, $record, $swapRecord = undef ) {
451             $self->post(
452             'com.atproto.repo.putRecord' => {
453             repo => $self->did,
454             collection => $collection,
455             rkey => $rkey,
456             record => $record,
457             defined $swapRecord ? ( swapRecord => $swapRecord ) : ()
458             }
459             );
460             }
461              
462             method apply_writes ( $writes, $swapCommit = undef ) {
463             $self->post(
464             'com.atproto.repo.applyWrites' => { repo => $self->did, writes => $writes, defined $swapCommit ? ( swapCommit => $swapCommit ) : () } );
465             }
466              
467             method resolve_did ($did) {
468             if ( $did =~ /^did:plc:(.+)$/ ) {
469             my ($content) = $http->get( 'https://plc.directory/' . $did );
470             return $content;
471             }
472             elsif ( $did =~ /^did:web:(.+)$/ ) {
473             my $domain = $1;
474             $domain =~ s/:/\//g;
475             my ($content) = $http->get("https://$domain/.well-known/did.json");
476             return $content;
477             }
478             return;
479             }
480              
481             method pds_for_did ($did) {
482             my $doc = $self->resolve_did($did);
483             return unless $doc && ref $doc eq 'HASH' && $doc->{service};
484             for my $service ( @{ $doc->{service} } ) {
485             return $service->{serviceEndpoint} if $service->{type} eq 'AtprotoPersonalDataServer';
486             }
487             return;
488             }
489              
490             method peer_id_for_did ($did) {
491             my $doc = $self->resolve_did($did);
492             return unless $doc && ref $doc eq 'HASH' && $doc->{verificationMethod};
493              
494             # Look for the primary signing key (usually the first one)
495             my $vm = $doc->{verificationMethod}[0];
496             my $pub_key_multibase = $vm->{publicKeyMultibase} // return;
497              
498             # publicKeyMultibase for secp256k1 in atproto usually starts with 'z' (base58btc)
499             # and has a multicodec prefix.
500             require InterPlanetary::Multibase;
501             my $raw = InterPlanetary::Multibase->decode($pub_key_multibase);
502              
503             # For secp256k1 (0xe7 multicodec), we need to extract the 33-byte compressed key
504             # and wrap it in the libp2p Protobuf PublicKey.
505             require Net::Libp2p::Crypto;
506             require InterPlanetary::Utils;
507              
508             # Simplified: assume it's secp256k1 for now as per bsky standard
509             my $key_bytes = substr( $raw, 2 ); # Skip multicodec prefix (usually 2 bytes for 0xe7)
510              
511             # Wrap in libp2p Protobuf (Type 2 = Secp256k1)
512             my $pk_pb = pack( 'C', ( 1 << 3 ) | 0 ) . InterPlanetary::Utils::encode_varint(2);
513             $pk_pb .= pack( 'C', ( 2 << 3 ) | 2 ) . InterPlanetary::Utils::encode_varint( length($key_bytes) ) . $key_bytes;
514             return Net::Libp2p::Crypto->peer_id_from_public_key($pk_pb);
515             }
516             method session() { $session //= $self->get('com.atproto.server.getSession'); $session; }
517              
518             method get_block ( $cid_str, $target_peer_id = undef, $did = undef ) {
519             if ($ipfs_node) {
520             my $cid = InterPlanetary::CID->from_string($cid_str);
521             my $data = $ipfs_node->blockstore->get($cid);
522             return Future->done($data) if $data;
523             if ($target_peer_id) {
524              
525             #~ say "[IPFS] Block $cid_str not found locally, attempting Bitswap from $target_peer_id...";
526 0           return $ipfs_node->host->dial( $target_peer_id, '/ipfs/bitswap/1.2.0' )->then(
527 0     0     sub ($ss) {
  0            
528              
529             # The local bitswap handler needs to 'own' this stream to read the response
530 0           $bitswap->handle_stream($ss);
531 0           return $bitswap->request_block( $ss, $cid );
532             }
533             )->else(
534             sub {
535 0     0     my ($e) = @_;
536              
537             #~ say "[IPFS] Bitswap failed: $e. Falling back to HTTP...";
538 0           return $self->_get_block_http( $cid_str, $did );
539             }
540             );
541             }
542             }
543             return $self->_get_block_http( $cid_str, $did );
544             }
545              
546             method _get_block_http ( $cid_str, $did ) {
547              
548             # If no DID provided, we can't fallback to sync endpoints
549             return Future->done(undef) unless $did;
550              
551             #~ say "[HTTP] Fetching block $cid_str for $did via com.atproto.sync.getBlocks...";
552             # com.atproto.sync.getBlocks returns a CAR file
553             return Future->call(
554             sub {
555 0     0     my $car_data = $self->get( 'com.atproto.sync.getBlocks' => { did => $did, cids => [$cid_str] } );
556 0 0         return undef unless $car_data;
557 0           require Archive::CAR;
558 0           require Archive::CAR::v1;
559 0           my $car = Archive::CAR::v1->new();
560 0           open my $fh, '<', \$car_data;
561 0           $car->read($fh);
562              
563             # Archive::CAR blocks is an arrayref of { cid => CIDobj, data => ... }
564 0           for my $block ( @{ $car->blocks } ) {
  0            
565              
566             # CIDs are complex objects, to_string should work for matching
567             # But the CID in the CAR might be different multibase than $cid_str
568             # For now, simplistic comparison or raw check
569 0 0         return $block->{data} if $block->{cid}->to_string eq $cid_str;
570             }
571 0           return undef;
572             }
573             );
574             }
575             method get_repo_head ($did) { $self->get( 'com.atproto.sync.getHead' => { did => $did } ) }
576 0     0     sub _now { Time::Moment->now }
577             method _duration ($seconds) { $seconds || return '0 seconds'; $seconds = abs $seconds; return $seconds . ' seconds' }
578              
579             method ratelimit_ ( $headers, $type, $meta //= () ) {
580             my %h = map { lc($_) => $headers->{$_} } keys %$headers;
581             return unless exists $h{'ratelimit-limit'};
582             my $rate = {
583             limit => $h{'ratelimit-limit'},
584             remaining => $h{'ratelimit-remaining'},
585             reset => $h{'ratelimit-reset'},
586             policy => $h{'ratelimit-policy'},
587             };
588             defined $meta ? $ratelimits{$type}{$meta} = $rate : $ratelimits{$type} = $rate;
589             }
590              
591             method _ratecheck( $type, $meta //= () ) {
592             my $rate = defined $meta ? $ratelimits{$type}{$meta} : $ratelimits{$type};
593             return unless $rate && $rate->{reset};
594             if ( $rate->{remaining} <= 0 && time < $rate->{reset} ) {
595             my $wait = $rate->{reset} - time;
596             warnings::warnif( At => "Rate limit exceeded for $type. Reset in $wait seconds." );
597             }
598             elsif ( $rate->{remaining} < ( $rate->{limit} * 0.1 ) ) {
599             warnings::warnif( At => "Approaching rate limit for $type ($rate->{remaining} remaining)." );
600             }
601             }
602             }
603             1;
604             __END__