File Coverage

blib/lib/Mastodon/Listener.pm
Criterion Covered Total %
statement 30 72 41.6
branch 0 14 0.0
condition n/a
subroutine 10 17 58.8
pod 0 4 0.0
total 40 107 37.3


line stmt bran cond sub pod time code
1             package Mastodon::Listener;
2              
3 5     5   41 use strict;
  5         12  
  5         156  
4 5     5   30 use warnings;
  5         13  
  5         254  
5              
6             our $VERSION = '0.017';
7              
8 5     5   37 use Moo;
  5         11  
  5         56  
9             with 'Role::EventEmitter';
10              
11 5     5   2052 use Types::Standard qw( Int Str Bool );
  5         12  
  5         40  
12 5     5   4232 use Mastodon::Types qw( Instance to_Status to_Notification );
  5         14  
  5         34  
13 5     5   8496 use IO::Async::Loop;
  5         94907  
  5         182  
14 5     5   4066 use Net::Async::HTTP;
  5         408858  
  5         222  
15 5     5   49 use Try::Tiny;
  5         13  
  5         489  
16 5     5   2654 use JSON::MaybeXS qw( decode_json );
  5         29881  
  5         295  
17              
18 5     5   43 use Log::Any;
  5         12  
  5         61  
19             my $log = Log::Any->get_logger(category => 'Mastodon');
20              
21             has instance => (
22             is => 'ro',
23             isa => Instance,
24             coerce => 1,
25             default => 'mastodon.cloud',
26             );
27              
28             has api_version => (
29             is => 'ro',
30             isa => Int,
31             default => 1,
32             );
33              
34             has url => (
35             is => 'ro',
36             lazy => 1,
37             default => sub {
38             $_[0]->instance
39             . '/api/v' . $_[0]->api_version
40             . '/streaming/' . $_[0]->stream;
41             },
42             );
43              
44             has stream => (
45             is => 'ro',
46             lazy => 1,
47             default => 'public',
48             );
49              
50             has access_token => (
51             is => 'ro',
52             required => 1,
53             );
54              
55             has _ua => (
56             is => 'rw',
57             init_arg => undef,
58             default => sub { Net::Async::HTTP->new },
59             );
60              
61             has _future => (
62             is => 'rw',
63             init_arg => undef,
64             lazy => 1,
65             default => sub { Future->new },
66             );
67              
68             has coerce_entities => (
69             is => 'rw',
70             isa => Bool,
71             lazy => 1,
72             default => 0,
73             );
74              
75             sub BUILD {
76 0     0 0   my ($self, $arg) = @_;
77 0           IO::Async::Loop->new->add($self->_ua);
78             }
79              
80             sub start {
81 0     0 0   my $self = shift;
82 0     0     my $on_error = sub { $self->emit( error => shift, shift, \@_ ) };
  0            
83              
84             $self->_future(
85             $self->_ua->do_request(
86             uri => $self->url,
87             headers => {
88             Authorization => 'Bearer ' . $self->access_token,
89             },
90 0     0     on_error => sub { $on_error->( 1, shift, \@_ ) },
91             on_header => sub {
92 0     0     my $response = shift;
93 0 0         $on_error->( 1, $response->message, $response )
94             unless $response->is_success;
95              
96 0           my $current_event;
97 0           my $buffer = '';
98              
99             return sub {
100 0           my $chunk = shift;
101              
102             # We do not have enough data yet, add it to the buffer
103 0 0         unless ( $chunk =~ /\n$/m ) {
104 0           $buffer .= $chunk;
105 0           return;
106             }
107              
108 0           $chunk = $buffer . $chunk;
109              
110 0 0         if ( $chunk =~ /^(:thump|event: (\w+))$/m ) {
111 0           my $line = $1;
112 0           my $event = $2;
113              
114 0 0         unless ( defined $event ) {
115             # Heartbeats have no data
116 0           $self->emit( 'heartbeat' );
117 0           return;
118             }
119              
120 0           $current_event = $event;
121 0           $chunk =~ s/$line\n//;
122             }
123              
124 0 0         if ( $chunk =~ /^data:/ ) {
125 0           $chunk =~ s/^data:\s+//;
126              
127 0 0         if ( $current_event eq 'delete' ) {
128             # The payload for delete is a single integer
129 0           $self->emit( delete => $chunk );
130             }
131             else {
132             try {
133 0           my $payload = decode_json $chunk;
134 0           $self->emit( $current_event => $payload );
135             }
136             catch {
137 0           $self->emit( error => 0,
138             "Error decoding JSON payload: $_", $chunk
139             );
140 0           };
141             }
142              
143 0           undef $current_event;
144             }
145              
146 0           $buffer = '';
147 0           return;
148             }
149 0           },
150             )
151 0           );
152              
153 0           $self->_future->get;
154             }
155              
156             sub stop {
157 0     0 0   my $self = shift;
158 0 0         $self->_future->done(@_) unless $self->_future->is_ready;
159 0           return $self;
160             }
161              
162             sub reset {
163 0     0 0   my $self = shift;
164 0           $self->stop->start;
165             }
166              
167             around emit => sub {
168             my $orig = shift;
169             my $self = shift;
170              
171             my ($event, $data, @rest) = @_;
172             if ($event =~ /(update|notification)/ and $self->coerce_entities) {
173             $data = to_Notification($data) if $event eq 'notification';
174             $data = to_Status($data) if $event eq 'update';
175             }
176              
177             $self->$orig($event, $data, @rest);
178             };
179              
180             1;
181              
182             __END__
183              
184             =encoding utf8
185              
186             =head1 NAME
187              
188             Mastodon::Listener - Access the streaming API of a Mastodon server
189              
190             =head1 SYNOPSIS
191              
192             # From Mastodon::Client
193             my $listener = $client->stream( 'public' );
194              
195             # Or use it directly
196             my $listener = Mastodon::Listener->new(
197             url => 'https://mastodon.cloud/api/v1/streaming/public',
198             access_token => $token,
199             coerce_entities => 1,
200             )
201              
202             $listener->on( update => sub {
203             my ($listener, $status) = @_;
204             printf "%s said: %s\n",
205             $status->account->display_name,
206             $status->content;
207             });
208              
209             $listener->start;
210              
211             =head1 DESCRIPTION
212              
213             A Mastodon::Listener object is created by calling the B<stream> method from a
214             L<Mastodon::Client>, and it exists for the sole purpose of parsing a stream of
215             events from a Mastodon server.
216              
217             Mastodon::Listener objects inherit from L<Role::EventEmitter>. Please refer to
218             its documentation for details on how to register callbacks for the different
219             events.
220              
221             Once callbacks have been registered, the listener can be set in motion by
222             calling its B<start> method, which takes no arguments and never returns.
223             The B<stop> method can be called from within callbacks to disconnect from the
224             stream.
225              
226             =head1 ATTRIBUTES
227              
228             =over 4
229              
230             =item B<access_token>
231              
232             The OAuth2 access token of your application, if authorization is needed. This
233             is not needed for streaming from public timelines.
234              
235             =item B<api_version>
236              
237             The API version to use. Defaults to C<1>.
238              
239             =item B<coerce_entities>
240              
241             Whether JSON responses should be coerced into Mastodon::Entity objects.
242             Currently defaults to false (but this will likely change in v0.01).
243              
244             =item B<instance>
245              
246             The instance to use, as a L<Mastodon::Entity::Instance> object. Will be coerced
247             from a URL, and defaults to C<mastodon.social>.
248              
249             =item B<stream>
250              
251             The stream to use. Current valid streams are C<public>, C<user>, and tag
252             timelines. To access a tag timeline, the argument to this value should begin
253             with a hash character (C<#>).
254              
255             =item B<url>
256              
257             The full streaming URL to use. By default, it is constructed from the values in
258             the B<instance>, B<api_version>, and B<stream> attributes.
259              
260             =back
261              
262             =head1 EVENTS
263              
264             =over 4
265              
266             =item B<update>
267              
268             A new status has appeared. Callback will be called with the listener and
269             the new status.
270              
271             =item B<notification>
272              
273             A new notification has appeared. Callback will be called with the listener
274             and the new notification.
275              
276             =item B<delete>
277              
278             A status has been deleted. Callback will be called with the listener and the
279             ID of the deleted status.
280              
281             =item B<heartbeat>
282              
283             A new C<:thump> has been received from the server. This is mostly for
284             debugging purposes.
285              
286             =item B<error>
287              
288             Inherited from L<Role::EventEmitter>, will be emitted when an error was found.
289             The callback will be called with a fatal flag, an error message, and any
290             relevant data as a single third arghument.
291              
292             If the error event is triggered in response to a 4xx or 5xx error, the data
293             payload will be an array reference with the response and request objects
294             as received from L<Net::Async::HTTP>.
295              
296             =back
297              
298             =head1 AUTHOR
299              
300             =over 4
301              
302             =item *
303              
304             José Joaquín Atria <jjatria@cpan.org>
305              
306             =back
307              
308             =head1 COPYRIGHT AND LICENSE
309              
310             This software is copyright (c) 2017 by José Joaquín Atria.
311              
312             This is free software; you can redistribute it and/or modify it under
313             the same terms as the Perl 5 programming language system itself.
314              
315             =cut