File Coverage

lib/At.pm
Criterion Covered Total %
statement 59 78 75.6
branch 0 4 0.0
condition n/a
subroutine 20 24 83.3
pod n/a
total 79 106 74.5


line stmt bran cond sub pod time code
1 3     3   671722 use v5.42;
  3         11  
2 3     3   17 use feature 'class';
  3         10  
  3         611  
3 3     3   22 no warnings 'experimental::class', 'experimental::builtin', 'experimental::for_list'; # Be quiet.
  3         16  
  3         301  
4              
5             #~ |---------------------------------------|
6             #~ |------3-33-----------------------------|
7             #~ |-5-55------4-44-5-55----353--3-33-/1~--|
8             #~ |---------------------335---33----------|
9             class At 1.6 {
10 3     3   19 use Carp qw[];
  3         5  
  3         74  
11 3     3   1748 use experimental 'try';
  3         12557  
  3         15  
12 3     3   1929 use File::ShareDir::Tiny qw[dist_dir];
  3         6671  
  3         320  
13 3     3   1748 use JSON::PP qw[decode_json encode_json];
  3         44782  
  3         273  
14 3     3   2274 use Path::Tiny qw[path];
  3         45962  
  3         258  
15 3     3   2150 use Digest::SHA qw[sha256];
  3         10653  
  3         322  
16 3     3   1819 use MIME::Base64 qw[encode_base64url];
  3         2371  
  3         206  
17 3     3   1930 use Crypt::PK::ECC;
  3         86283  
  3         222  
18 3     3   32 use Crypt::PRNG qw[random_string];
  3         5  
  3         154  
19 3     3   1545 use Time::Moment; # Internal; standardize around Zulu
  3         4576  
  3         95  
20 3     3   2010 use URI;
  3         20447  
  3         135  
21 3     3   57 use warnings::register;
  3         7  
  3         212  
22 3     3   1701 use At::Error;
  3         9  
  3         10  
23 3     3   1800 use At::Protocol::URI;
  3         14  
  3         295  
24 3     3   1973 use At::Protocol::Session;
  3         10  
  3         175  
25 3     3   1674 use At::UserAgent;
  3         10  
  3         8469  
26             field $share : reader : param = ();
27             field %lexicons : reader;
28             field $http : reader : param = ();
29             field $lexicon_paths_param : param(lexicon_paths) = [];
30             field @lexicon_paths;
31             field $host : param : reader //= 'bsky.social';
32             field $ipfs_node : param : reader //= undef;
33             field $bitswap;
34             method set_host ($new) { $host = $new }
35             field $session = ();
36             field $oauth_state;
37             field $dpop_key;
38             field %ratelimits : reader = ( # https://docs.bsky.app/docs/advanced-guides/rate-limits
39             global => {},
40             updateHandle => {}, # per DID
41             createSession => {}, # per handle
42             deleteAccount => {}, # by IP
43             resetPassword => {} # by IP
44             );
45             ADJUST {
46             if ($ipfs_node) {
47             require InterPlanetary::Protocol::Bitswap;
48             $bitswap = InterPlanetary::Protocol::Bitswap->new( node => $ipfs_node );
49             }
50             if ( !defined $share ) {
51             try { $share = dist_dir('At') }
52             catch ($e) { $share = 'share' }
53             }
54             $share = path($share) unless builtin::blessed $share;
55             @lexicon_paths = map { path($_) } ( ref $lexicon_paths_param eq 'ARRAY' ? @$lexicon_paths_param : ($lexicon_paths_param) );
56             if ( !defined $http ) {
57             my $ua_class;
58             try {
59             require Mojo::UserAgent;
60             $ua_class = 'At::UserAgent::Mojo';
61             }
62             catch ($e) {
63             $ua_class = 'At::UserAgent::Tiny';
64             }
65             $http = $ua_class->new();
66             }
67             $host = 'https://' . $host unless $host =~ /^https?:/;
68             $host = URI->new($host) unless builtin::blessed $host;
69             }
70              
71             # OAuth Implementation
72             method _get_dpop_key() {
73             unless ($dpop_key) {
74             $dpop_key = Crypt::PK::ECC->new();
75             $dpop_key->generate_key('secp256r1');
76             }
77             return $dpop_key;
78             }
79              
80             method oauth_discover ($handle) {
81             my $res = $self->resolve_handle($handle);
82             if ( builtin::blessed($res) && $res->isa('At::Error') ) { $res->throw; }
83             return unless $res && $res->{did};
84             my $pds = $self->pds_for_did( $res->{did} );
85             unless ($pds) { die 'Could not resolve PDS for DID: ' . $res->{did}; }
86             my ($protected) = $http->get( $pds . '/.well-known/oauth-protected-resource' );
87             if ( builtin::blessed($protected) && $protected->isa('At::Error') ) { $protected->throw; }
88             return unless $protected && $protected->{authorization_servers};
89             my $auth_server = $protected->{authorization_servers}[0];
90             my ($metadata) = $http->get( $auth_server . '/.well-known/oauth-authorization-server' );
91             if ( builtin::blessed($metadata) && $metadata->isa('At::Error') ) { $metadata->throw; }
92             return { pds => $pds, auth_server => $auth_server, metadata => $metadata, did => $res->{did} };
93             }
94              
95             method oauth_start ( $handle, $client_id, $redirect_uri, $scope = 'atproto' ) {
96             my $discovery = $self->oauth_discover($handle);
97             die 'Failed to discover OAuth metadata for ' . $handle unless $discovery;
98             my $chars = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-._~';
99             my $code_verifier = Crypt::PRNG::random_string_from( $chars, 43 );
100             my $code_challenge = encode_base64url( sha256($code_verifier) );
101             $code_challenge =~ s/=+$//;
102             my $state = Crypt::PRNG::random_string_from( $chars, 16 );
103             $oauth_state = {
104             discovery => $discovery,
105             code_verifier => $code_verifier,
106             state => $state,
107             redirect_uri => $redirect_uri,
108             client_id => $client_id,
109             handle => $handle,
110             scope => $scope
111             };
112              
113             # Prepare UA for DPoP
114             $http->set_tokens( undef, undef, 'DPoP', $self->_get_dpop_key() );
115             my $par_endpoint = $discovery->{metadata}{pushed_authorization_request_endpoint};
116             my $par_content = {
117             client_id => $client_id,
118             response_type => 'code',
119             code_challenge => $code_challenge,
120             code_challenge_method => 'S256',
121             redirect_uri => $redirect_uri,
122             state => $state,
123             scope => $scope,
124             aud => $discovery->{pds},
125             };
126             say '[DEBUG] [At] PAR request: ' . JSON::PP->new->ascii->encode($par_content) if $ENV{DEBUG};
127             my ($par_res) = $http->post(
128             $par_endpoint => {
129             headers => { DPoP => $http->_generate_dpop_proof( $par_endpoint, 'POST', 1 ) },
130             encoding => 'form',
131             content => $par_content,
132             skip_ath => 1
133             }
134             );
135             die 'PAR failed: ' . ( $par_res . '' ) if builtin::blessed $par_res;
136             say '[DEBUG] [At] PAR response: ' . JSON::PP->new->ascii->encode($par_res) if $ENV{DEBUG};
137             my $auth_uri = URI->new( $discovery->{metadata}{authorization_endpoint} );
138             $auth_uri->query_form( client_id => $client_id, request_uri => $par_res->{request_uri} );
139             return $auth_uri->as_string;
140             }
141              
142             method oauth_callback ( $code, $state ) {
143             die 'OAuth state mismatch' unless $oauth_state && $state eq $oauth_state->{state};
144             my $token_endpoint = $oauth_state->{discovery}{metadata}{token_endpoint};
145             my $key = $self->_get_dpop_key();
146             my ($token_res) = $http->post(
147             $token_endpoint => {
148             headers => { DPoP => $http->_generate_dpop_proof( $token_endpoint, 'POST', 1 ) },
149             encoding => 'form',
150             content => {
151             grant_type => 'authorization_code',
152             code => $code,
153             client_id => $oauth_state->{client_id},
154             redirect_uri => $oauth_state->{redirect_uri},
155             code_verifier => $oauth_state->{code_verifier},
156             aud => $oauth_state->{discovery}{pds}
157             },
158             skip_ath => 1
159             }
160             );
161             die 'Token exchange failed: ' . ( $token_res . '' ) if builtin::blessed $token_res;
162             say '[DEBUG] [At] Token response: ' . JSON::PP->new->ascii->encode($token_res) if $ENV{DEBUG};
163             $session = At::Protocol::Session->new(
164             did => $token_res->{sub},
165             accessJwt => $token_res->{access_token},
166             refreshJwt => $token_res->{refresh_token},
167             handle => $oauth_state->{handle},
168             token_type => 'DPoP',
169             dpop_key_jwk => $key->export_key_jwk('private'),
170             client_id => $oauth_state->{client_id},
171             scope => $token_res->{scope},
172             pds => $oauth_state->{discovery}{pds}
173             );
174             $self->set_host( $oauth_state->{discovery}{pds} );
175             $http->set_tokens( $token_res->{access_token}, $token_res->{refresh_token}, 'DPoP', $key );
176             }
177              
178             method oauth_refresh() {
179             return unless $session && $session->refreshJwt && $session->token_type eq 'DPoP';
180             my $discovery = $self->oauth_discover( $session->handle );
181             return unless $discovery;
182             my $token_endpoint = $discovery->{metadata}{token_endpoint};
183             my $key = $self->_get_dpop_key();
184             my $refresh_content = {
185             grant_type => 'refresh_token',
186             refresh_token => $session->refreshJwt,
187             client_id => $session->client_id // '',
188             aud => $discovery->{pds},
189             };
190             say '[DEBUG] [At] Refresh request: ' . JSON::PP->new->ascii->encode($refresh_content) if $ENV{DEBUG};
191             my ($token_res) = $http->post(
192             $token_endpoint => {
193             headers => { DPoP => $http->_generate_dpop_proof( $token_endpoint, 'POST', 1 ) },
194             encoding => 'form',
195             content => $refresh_content,
196             skip_ath => 1
197             }
198             );
199             die 'Refresh failed: ' . ( $token_res . '' ) if builtin::blessed $token_res;
200             $session = At::Protocol::Session->new(
201             did => $token_res->{sub},
202             accessJwt => $token_res->{access_token},
203             refreshJwt => $token_res->{refresh_token},
204             handle => $session->handle,
205             token_type => 'DPoP',
206             dpop_key_jwk => $key->export_key_jwk('private'),
207             client_id => $session->client_id,
208             scope => $token_res->{scope},
209             pds => $discovery->{pds}
210             );
211             $self->set_host( $discovery->{pds} );
212             return $http->set_tokens( $token_res->{access_token}, $token_res->{refresh_token}, 'DPoP', $key );
213             }
214              
215             method collection_scope ( $collection, $action = 'create' ) {
216             return "repo:$collection?action=$action";
217             }
218              
219             # Legacy Auth
220             method login( $identifier, $password ) {
221             warnings::warnif( At => 'login() (com.atproto.server.createSession) is deprecated. Please use OAuth instead.' );
222             my $res = $self->post( 'com.atproto.server.createSession' => { identifier => $identifier, password => $password } );
223             if ( $res && !builtin::blessed($res) ) { $session = At::Protocol::Session->new(%$res); }
224             else { $session = $res; }
225             return $session ? $http->set_tokens( $session->accessJwt, $session->refreshJwt, undef, undef ) : $session;
226             }
227              
228             method resume ( $accessJwt, $refreshJwt, $token_type = 'Bearer', $dpop_key_jwk = (), $client_id = (), $handle = (), $pds = (), $scope = () ) {
229             my $access = $self->_decode_token($accessJwt);
230             my $refresh = $self->_decode_token($refreshJwt);
231             return unless $access;
232             my $key;
233             if ( $token_type eq 'DPoP' && $dpop_key_jwk ) {
234             $key = Crypt::PK::ECC->new();
235             $key->import_key( \$dpop_key_jwk );
236             $dpop_key = $key;
237             }
238             if ( $refresh && time > $access->{exp} && time < $refresh->{exp} ) {
239             if ( $token_type eq 'DPoP' ) { return $self->oauth_refresh(); }
240             else {
241             my $res = $self->post( 'com.atproto.server.refreshSession' => { refreshJwt => $refreshJwt } );
242             if ( $res && !builtin::blessed($res) ) { $session = At::Protocol::Session->new(%$res); }
243             else { $session = $res; }
244             return $session ? $http->set_tokens( $session->accessJwt, $session->refreshJwt, $token_type, $key ) : $session;
245             }
246             }
247             $session = At::Protocol::Session->new(
248             did => $access->{sub},
249             accessJwt => $accessJwt,
250             refreshJwt => $refreshJwt,
251             token_type => $token_type,
252             dpop_key_jwk => $dpop_key_jwk,
253             client_id => $client_id,
254             handle => $handle,
255             pds => $pds,
256             scope => $scope
257             );
258             $self->set_host($pds) if $pds;
259             return $http->set_tokens( $accessJwt, $refreshJwt, $token_type, $key );
260             }
261              
262             method _decode_token ($token) {
263             return unless defined $token;
264 3     3   30 use MIME::Base64 qw[decode_base64];
  3         5  
  3         16560  
265             my ( $header, $payload, $sig ) = split /\./, $token;
266             return unless defined $payload;
267             $payload =~ tr[-_][+/];
268             try {
269             return decode_json decode_base64 $payload;
270             }
271             catch ($e) {
272             return;
273             }
274             }
275              
276             # XRPC & Lexicons
277             method _locate_lexicon($fqdn) {
278             $fqdn // return undef;
279             $fqdn =~ s/^#//; # Strip leading hash
280             unless ( defined $lexicons{$fqdn} ) {
281             my $base_fqdn = $fqdn =~ s[#(.+)$][]r;
282             return undef unless $base_fqdn;
283             my @namespace = split /\./, $base_fqdn;
284             return undef unless @namespace;
285             my @search = (
286             @lexicon_paths,
287             $share->child('lexicons'),
288             defined $ENV{HOME} ? path( $ENV{HOME}, '.cache', 'atproto', 'lexicons' ) : (),
289             path( 'share', 'lexicons' )
290             );
291             my $lex_file;
292             for my $dir (@search) {
293             next unless defined $dir;
294             my $possible = $dir->child( @namespace[ 0 .. $#namespace - 1 ], $namespace[-1] . '.json' );
295             if ( $possible->exists ) { $lex_file = $possible; last; }
296             }
297             if ( !$lex_file ) { $lex_file = $self->_fetch_lexicon($base_fqdn); }
298             if ( $lex_file && $lex_file->exists ) {
299             my $json = decode_json $lex_file->slurp_raw;
300             for my $def ( keys %{ $json->{defs} } ) {
301             $lexicons{ $base_fqdn . ( $def eq 'main' ? '' : '#' . $def ) } = $json->{defs}{$def};
302             $lexicons{ $base_fqdn . '#main' } = $json->{defs}{$def} if $def eq 'main';
303             }
304             }
305             }
306             $lexicons{$fqdn};
307             }
308              
309             method _fetch_lexicon($base_fqdn) {
310             my @namespace = split /\./, $base_fqdn;
311             my $rel_path = join( '/', @namespace[ 0 .. $#namespace - 1 ], $namespace[-1] . '.json' );
312             my $url = 'https://raw.githubusercontent.com/bluesky-social/atproto/main/lexicons/' . $rel_path;
313             my ( $content, $headers ) = $http->get($url);
314             if ( $content && !builtin::blessed($content) ) {
315             my $cache_dir = defined $ENV{HOME} ? path( $ENV{HOME}, '.cache', 'atproto', 'lexicons' ) : path( '.cache', 'atproto', 'lexicons' );
316             $cache_dir->mkpath;
317             my $lex_file = $cache_dir->child( @namespace[ 0 .. $#namespace - 1 ], $namespace[-1] . '.json' );
318             $lex_file->parent->mkpath;
319             $lex_file->spew_raw( builtin::blessed($content) ? encode_json($content) : $content );
320             return $lex_file;
321             }
322             return;
323             }
324              
325             method get( $fqdn, $args = (), $headers = {} ) {
326             my $lexicon = $self->_locate_lexicon($fqdn);
327             my $category = $fqdn =~ /^com\.atproto\.repo\./ ? 'repo' : 'global';
328             my $meta = $category eq 'repo' ? ( $args->{repo} // $self->did ) : ();
329             $self->_ratecheck( $category, $meta );
330             my ( $content, $res_headers )
331             = $http->get( sprintf( '%s/xrpc/%s', $host, $fqdn ), { defined $args ? ( content => $args ) : (), headers => $headers } );
332             $self->ratelimit_( $res_headers, $category, $meta );
333             if ( $lexicon && !builtin::blessed $content ) {
334             $content = $self->_coerce( $fqdn, $lexicon->{output}{schema}, $content );
335             }
336             wantarray ? ( $content, $res_headers ) : $content;
337             }
338              
339             method post( $fqdn, $args = (), $headers = {} ) {
340             my @namespace = split /\./, $fqdn;
341             my $lexicon = $self->_locate_lexicon($fqdn);
342              
343             # Categorize according to bsky specs
344             my $category = 'global';
345             my $meta = ();
346             if ( $fqdn =~ /^com\.atproto\.server\.createSession$/ ) {
347             $category = 'auth';
348             $meta = $args->{identifier};
349             }
350             elsif ( $fqdn =~ /^com\.atproto\.repo\./ ) {
351             $category = 'repo';
352             $meta = $args->{repo} // $self->did;
353             }
354             elsif ( $namespace[-1] =~ m[^(updateHandle|createAccount|deleteAccount|resetPassword)$] ) {
355             $category = $namespace[-1];
356             $meta = $args->{did} // $args->{handle} // $args->{email};
357             }
358             $self->_ratecheck( $category, $meta );
359             my ( $content, $res_headers )
360             = $http->post( sprintf( '%s/xrpc/%s', $host, $fqdn ), { defined $args ? ( content => $args ) : (), headers => $headers } );
361             $self->ratelimit_( $res_headers, $category, $meta );
362             if ( $lexicon && !builtin::blessed $content ) {
363             $content = $self->_coerce( $fqdn, $lexicon->{output}{schema}, $content );
364             }
365             return wantarray ? ( $content, $res_headers ) : $content;
366             }
367             method subscribe( $id, $cb ) { $self->http->websocket( sprintf( '%s/xrpc/%s', $host, $id ), $cb ); }
368              
369             method firehose ( $callback, $url = () ) {
370             require At::Protocol::Firehose;
371             return At::Protocol::Firehose->new( at => $self, callback => $callback, defined $url ? ( url => $url ) : () );
372             }
373              
374             # Coercion Logic
375             my %coercions = (
376             array => method( $namespace, $schema, $data ) {
377             [ map { $self->_coerce( $namespace, $schema->{items}, $_ ) } @$data ]
378             },
379             boolean => method( $namespace, $schema, $data ) { !!$data },
380             bytes => method( $namespace, $schema, $data ) {$data},
381             blob => method( $namespace, $schema, $data ) {$data},
382             integer => method( $namespace, $schema, $data ) { int $data },
383             object => method( $namespace, $schema, $data ) {
384             for my ( $name, $subschema )( %{ $schema->{properties} } ) {
385             $data->{$name} = $self->_coerce( $namespace, $subschema, $data->{$name} );
386             }
387             $data;
388             },
389             ref => method( $namespace, $schema, $data ) {
390             my $target_namespace = $self->_resolve_namespace( $namespace, $schema->{ref} );
391             my $lexicon = $self->_locate_lexicon($target_namespace);
392             return $data unless $lexicon;
393             $self->_coerce( $target_namespace, $lexicon, $data );
394             },
395             union => method( $namespace, $schema, $data ) {$data},
396             unknown => method( $namespace, $schema, $data ) {$data},
397             string => method( $namespace, $schema, $data ) {
398             $data // return ();
399             if ( defined $schema->{format} ) {
400             if ( $schema->{format} eq 'uri' ) { return URI->new($data); }
401             elsif ( $schema->{format} eq 'at-uri' ) { return At::Protocol::URI->new($data); }
402             elsif ( $schema->{format} eq 'datetime' ) {
403             return $data =~ /\D/ ? Time::Moment->from_string($data) : Time::Moment->from_epoch($data);
404             }
405             elsif ( $schema->{format} eq 'did' ) {
406             require At::Protocol::DID;
407             return At::Protocol::DID->new($data);
408             }
409             elsif ( $schema->{format} eq 'handle' ) {
410             require At::Protocol::Handle;
411             try { return At::Protocol::Handle->new($data); }
412             catch ($e) { return $data; }
413             }
414             }
415             $data;
416             }
417             );
418              
419             method _coerce ( $namespace, $schema, $data ) {
420             $data // return ();
421             return $coercions{ $schema->{type} }->( $self, $namespace, $schema, $data ) if defined $coercions{ $schema->{type} };
422             return $data;
423             }
424              
425             method _resolve_namespace ( $l, $r ) {
426             return $r if $r =~ m[\.]; # Absolute (has dots)
427             my $base = $l =~ s[#(.+)$][]r;
428             return $base . '#' . $r; # Relative to base of current FQDN
429             }
430              
431             # Identity & Helpers
432             method did() { $session ? $session->did . '' : undef; }
433             method resolve_handle($handle) { $self->get( 'com.atproto.identity.resolveHandle' => { handle => $handle } ); }
434              
435             method resolve_did_to_handle ($did) {
436             my $doc = $self->resolve_did($did);
437             return $doc->{alsoKnownAs}[0] =~ s/^at:\/\///r if $doc && $doc->{alsoKnownAs};
438             return;
439             }
440              
441             method upload_blob ( $data, $mime_type ) {
442             $self->post( 'com.atproto.repo.uploadBlob' => { content => $data, headers => { 'Content-Type' => $mime_type } } );
443             }
444              
445             method create_record ( $collection, $record, $rkey = undef ) {
446             $self->post( 'com.atproto.repo.createRecord' =>
447             { repo => $self->did, collection => $collection, record => $record, defined $rkey ? ( rkey => $rkey ) : () } );
448             }
449              
450             method delete_record ( $collection, $rkey ) {
451             $self->post( 'com.atproto.repo.deleteRecord' => { repo => $self->did, collection => $collection, rkey => $rkey } );
452             }
453              
454             method put_record ( $collection, $rkey, $record, $swapRecord = undef ) {
455             $self->post(
456             'com.atproto.repo.putRecord' => {
457             repo => $self->did,
458             collection => $collection,
459             rkey => $rkey,
460             record => $record,
461             defined $swapRecord ? ( swapRecord => $swapRecord ) : ()
462             }
463             );
464             }
465              
466             method apply_writes ( $writes, $swapCommit = undef ) {
467             $self->post(
468             'com.atproto.repo.applyWrites' => { repo => $self->did, writes => $writes, defined $swapCommit ? ( swapCommit => $swapCommit ) : () } );
469             }
470              
471             method resolve_did ($did) {
472             if ( $did =~ /^did:plc:(.+)$/ ) {
473             my ($content) = $http->get( 'https://plc.directory/' . $did );
474             return $content;
475             }
476             elsif ( $did =~ /^did:web:(.+)$/ ) {
477             my $domain = $1;
478             $domain =~ s/:/\//g;
479             my ($content) = $http->get("https://$domain/.well-known/did.json");
480             return $content;
481             }
482             return;
483             }
484              
485             method pds_for_did ($did) {
486             my $doc = $self->resolve_did($did);
487             return unless $doc && ref $doc eq 'HASH' && $doc->{service};
488             for my $service ( @{ $doc->{service} } ) {
489             return $service->{serviceEndpoint} if $service->{type} eq 'AtprotoPersonalDataServer';
490             }
491             return;
492             }
493              
494             method peer_id_for_did ($did) {
495             my $doc = $self->resolve_did($did);
496             return unless $doc && ref $doc eq 'HASH' && $doc->{verificationMethod};
497              
498             # Look for the primary signing key (usually the first one)
499             my $vm = $doc->{verificationMethod}[0];
500             my $pub_key_multibase = $vm->{publicKeyMultibase} // return;
501              
502             # publicKeyMultibase for secp256k1 in atproto usually starts with 'z' (base58btc)
503             # and has a multicodec prefix.
504             require InterPlanetary::Multibase;
505             my $raw = InterPlanetary::Multibase->decode($pub_key_multibase);
506              
507             # For secp256k1 (0xe7 multicodec), we need to extract the 33-byte compressed key
508             # and wrap it in the libp2p Protobuf PublicKey.
509             require Net::Libp2p::Crypto;
510             require InterPlanetary::Utils;
511              
512             # Simplified: assume it's secp256k1 for now as per bsky standard
513             my $key_bytes = substr( $raw, 2 ); # Skip multicodec prefix (usually 2 bytes for 0xe7)
514              
515             # Wrap in libp2p Protobuf (Type 2 = Secp256k1)
516             my $pk_pb = pack( 'C', ( 1 << 3 ) | 0 ) . InterPlanetary::Utils::encode_varint(2);
517             $pk_pb .= pack( 'C', ( 2 << 3 ) | 2 ) . InterPlanetary::Utils::encode_varint( length($key_bytes) ) . $key_bytes;
518             return Net::Libp2p::Crypto->peer_id_from_public_key($pk_pb);
519             }
520             method session() { $session //= $self->get('com.atproto.server.getSession'); $session; }
521              
522             method get_block ( $cid_str, $target_peer_id = undef, $did = undef ) {
523             if ($ipfs_node) {
524             my $cid = InterPlanetary::CID->from_string($cid_str);
525             my $data = $ipfs_node->blockstore->get($cid);
526             return Future->done($data) if $data;
527             if ($target_peer_id) {
528              
529             #~ say "[IPFS] Block $cid_str not found locally, attempting Bitswap from $target_peer_id...";
530 0           return $ipfs_node->host->dial( $target_peer_id, '/ipfs/bitswap/1.2.0' )->then(
531 0     0     sub ($ss) {
  0            
532              
533             # The local bitswap handler needs to 'own' this stream to read the response
534 0           $bitswap->handle_stream($ss);
535 0           return $bitswap->request_block( $ss, $cid );
536             }
537             )->else(
538             sub {
539 0     0     my ($e) = @_;
540              
541             #~ say "[IPFS] Bitswap failed: $e. Falling back to HTTP...";
542 0           return $self->_get_block_http( $cid_str, $did );
543             }
544             );
545             }
546             }
547             return $self->_get_block_http( $cid_str, $did );
548             }
549              
550             method _get_block_http ( $cid_str, $did ) {
551              
552             # If no DID provided, we can't fallback to sync endpoints
553             return Future->done(undef) unless $did;
554              
555             #~ say "[HTTP] Fetching block $cid_str for $did via com.atproto.sync.getBlocks...";
556             # com.atproto.sync.getBlocks returns a CAR file
557             return Future->call(
558             sub {
559 0     0     my $car_data = $self->get( 'com.atproto.sync.getBlocks' => { did => $did, cids => [$cid_str] } );
560 0 0         return undef unless $car_data;
561 0           require Archive::CAR;
562 0           require Archive::CAR::v1;
563 0           my $car = Archive::CAR::v1->new();
564 0           open my $fh, '<', \$car_data;
565 0           $car->read($fh);
566              
567             # Archive::CAR blocks is an arrayref of { cid => CIDobj, data => ... }
568 0           for my $block ( @{ $car->blocks } ) {
  0            
569              
570             # CIDs are complex objects, to_string should work for matching
571             # But the CID in the CAR might be different multibase than $cid_str
572             # For now, simplistic comparison or raw check
573 0 0         return $block->{data} if $block->{cid}->to_string eq $cid_str;
574             }
575 0           return undef;
576             }
577             );
578             }
579             method get_repo_head ($did) { $self->get( 'com.atproto.sync.getHead' => { did => $did } ) }
580 0     0     sub _now { Time::Moment->now }
581             method _duration ($seconds) { $seconds || return '0 seconds'; $seconds = abs $seconds; return $seconds . ' seconds' }
582              
583             method ratelimit_ ( $headers, $type, $meta //= () ) {
584             my %h = map { lc($_) => $headers->{$_} } keys %$headers;
585             return unless exists $h{'ratelimit-limit'};
586             my $rate = {
587             limit => $h{'ratelimit-limit'},
588             remaining => $h{'ratelimit-remaining'},
589             reset => $h{'ratelimit-reset'},
590             policy => $h{'ratelimit-policy'},
591             };
592             defined $meta ? $ratelimits{$type}{$meta} = $rate : $ratelimits{$type} = $rate;
593             }
594              
595             method _ratecheck( $type, $meta //= () ) {
596             my $rate = defined $meta ? $ratelimits{$type}{$meta} : $ratelimits{$type};
597             return unless $rate && $rate->{reset};
598             if ( $rate->{remaining} <= 0 && time < $rate->{reset} ) {
599             my $wait = $rate->{reset} - time;
600             warnings::warnif( At => "Rate limit exceeded for $type. Reset in $wait seconds." );
601             }
602             elsif ( $rate->{remaining} < ( $rate->{limit} * 0.1 ) ) {
603             warnings::warnif( At => "Approaching rate limit for $type ($rate->{remaining} remaining)." );
604             }
605             }
606             }
607             1;
608             __END__