File Coverage

blib/lib/Protocol/XMPP/Stream.pm
Criterion Covered Total %
statement 27 193 13.9
branch 0 38 0.0
condition 0 24 0.0
subroutine 9 52 17.3
pod 28 36 77.7
total 64 343 18.6


line stmt bran cond sub pod time code
1             package Protocol::XMPP::Stream;
2              
3 2     2   176289 use strict;
  2         4  
  2         78  
4 2     2   12 use warnings;
  2         3  
  2         148  
5 2     2   13 use parent qw{Protocol::XMPP::Base};
  2         6  
  2         15  
6              
7             our $VERSION = '0.007'; ## VERSION
8              
9             =head1 NAME
10              
11             Protocol::XMPP::Stream - handle XMPP protocol stream
12              
13             =head1 SYNOPSIS
14              
15             =head1 DESCRIPTION
16              
17             =head1 METHODS
18              
19             =cut
20              
21 2     2   1117 use XML::SAX;
  2         13957  
  2         128  
22 2     2   1010 use XML::LibXML::SAX::ChunkParser;
  2         136192  
  2         126  
23 2     2   1303 use Protocol::XMPP::Handler;
  2         6  
  2         60  
24 2     2   691 use Protocol::XMPP::Message;
  2         5  
  2         68  
25 2     2   1467 use Authen::SASL;
  2         2112  
  2         17  
26 2     2   1047 use MIME::Base64;
  2         1711  
  2         4507  
27              
28             =head2 new
29              
30             Instantiate a stream object. Used for interacting with the underlying XMPP stream.
31              
32             Takes the following parameters as callbacks:
33              
34             =over 4
35              
36             =item * on_queued_write - this will be called whenever there is data queued to be written to the socket
37              
38             =item * on_starttls - this will be called when we want to switch to TLS mode
39              
40             =back
41              
42             and the following scalar parameters:
43              
44             =over 4
45              
46             =item * user - username (not the full JID, just the first part)
47              
48             =item * pass - password
49              
50             =back
51              
52             =cut
53              
54             sub new {
55 0     0 1   my $class = shift;
56 0           my %args = @_;
57              
58 0           my $self = $class->SUPER::new(%args);
59 0           $self->reset;
60 0           $self->{write_buffer} = [];
61 0           $self;
62             }
63              
64             =head2 on_data
65              
66             Data has been received, pass it over to the SAX parser to trigger any required events.
67              
68             =cut
69              
70             sub on_data {
71 0     0 1   my $self = shift;
72 0           my $data = shift;
73 0 0         return $self unless length $data;
74              
75 0           $self->debug("<<< $data");
76 0           $self->{sax}->parse_chunk($data);
77 0           return $self;
78             }
79              
80             =head2 queue_write
81              
82             Queue up a write for this stream. Adds to the existing send buffer array if there is one.
83              
84             When a write is queued, this will send a notification to the on_queued_write callback if one
85             was defined.
86              
87             =cut
88              
89             sub queue_write {
90 0     0 1   my $self = shift;
91 0           my $v = shift;
92 0           $self->debug("Queued a write for [$v]");
93 0           my $f = $self->new_future;
94 0           push @{$self->{write_buffer}}, [ $v, $f ];
  0            
95 0 0         $self->{on_queued_write}->() if $self->{on_queued_write};
96 0           return $f;
97             }
98              
99             =head2 write_buffer
100              
101             Returns the contents of the current write buffer without changing it.
102              
103             =cut
104              
105 0     0 1   sub write_buffer { shift->{write_buffer} }
106              
107             =head2 extract_write
108              
109             Retrieves next pending message from the write buffer and removes it from the list.
110              
111             =cut
112              
113             sub extract_write {
114 0     0 1   my $self = shift;
115 0 0         return unless @{$self->{write_buffer}};
  0            
116 0           my $next = shift @{$self->{write_buffer}};
  0            
117 0           my ($v) = @$next;
118 0           $self->debug("Extract write [$v]");
119 0           return $v;
120             }
121              
122             sub extract_write_and_future {
123 0     0 0   my $self = shift;
124 0 0         return unless @{$self->{write_buffer}};
  0            
125 0           my $next = shift @{$self->{write_buffer}};
  0            
126 0           $self->debug("Extract write [$next->[0]]");
127 0           return $next;
128             }
129              
130             =head2 ready_to_send
131              
132             Returns true if there's data ready to be written.
133              
134             =cut
135              
136             sub ready_to_send {
137 0     0 1   my $self = shift;
138 0           $self->debug('Check whether ready to send, current length '. @{$self->{write_buffer}});
  0            
139 0           return @{$self->{write_buffer}};
  0            
140             }
141              
142             sub features_complete {
143 0     0 0   my $self = shift;
144 0   0       $self->{features_complete} ||= $self->new_future;
145             }
146              
147             =head2 reset
148              
149             Reset this stream.
150              
151             Clears out the existing SAX parsing information and sets up a new L ready to accept
152             events. Used when we expect a new C<> element, for example after authentication or TLS upgrade.
153              
154             =cut
155              
156             sub reset {
157 0     0 1   my $self = shift;
158 0           $self->debug('Reset stream');
159 0           delete $self->{remote_opened};
160 0           delete $self->{features_complete};
161 0           my $handler = Protocol::XMPP::Handler->new(
162             stream => $self # this will be converted to a weak ref to self...
163             );
164 0           $self->{handler} = $handler; # ... but we keep a strong ref to the handler since we control it
165              
166             # We need to be able to handle document fragments, so we specify a SAX parser here.
167             # TODO If ChunkParser advertised fragment handling as a feature we could require that
168             # rather than hardcoding the parser type here.
169             {
170 0           local $XML::SAX::ParserPackage = 'XML::LibXML::SAX::ChunkParser';
  0            
171 0 0         $self->{sax} = XML::SAX::ParserFactory->parser(Handler => $self->{handler}) or die "No SAX parser could be found";
172             };
173 0           $self->{data} = '';
174 0           return $self;
175             }
176              
177             =head2 dispatch_event
178              
179             Call the appropriate event handler.
180              
181             Currently defined events:
182              
183             =over 4
184              
185             =item * features - we have received the features list from the server
186              
187             =item * login - login was completed successfully
188              
189             =item * message - a message was received
190              
191             =item * presence - a presence notification was received
192              
193             =item * subscription - a presence notification was received
194              
195             =item * transfer_request - a file transfer request has been received
196              
197             =item * file - a file was received
198              
199             =back
200              
201             =cut
202              
203             sub dispatch_event {
204 0     0 1   my $self = shift;
205 0           my $type = shift;
206 0           my $method = 'on_' . $type;
207 0   0       my $sub = $self->{$method} || $self->can($method);
208 0 0         return $sub->($self, @_) if $sub;
209 0           $self->debug("No method found for $method");
210             }
211              
212             =head2 preamble
213              
214             Returns the XML header and opening stream preamble.
215              
216             =cut
217              
218             sub preamble {
219 0     0 1   my $self = shift;
220             # TODO yeah fix this
221             return [
222 0           qq{},
223             q{}
224             ];
225             }
226              
227             =head2 jid
228              
229             Returns the full JID for our user.
230              
231             If given a parameter, will set the JID to that value, extracting hostname and user by splitting the domain.
232              
233             =cut
234              
235             sub jid {
236 0     0 1   my $self = shift;
237 0 0         if(@_) {
238 0           $self->{jid} = shift;
239 0           ($self->{user}, $self->{hostname}) = split /\@/, $self->{jid}, 2;
240 0 0         ($self->{hostname}, $self->{resource}) = split qr{/}, $self->{hostname}, 2 if index($self->{hostname}, '/') >= 0;
241 0           return $self;
242             }
243 0           return $self->{jid};
244             }
245              
246             =head2 user
247              
248             Username for SASL authentication.
249              
250             =cut
251              
252 0     0 1   sub user { shift->{user} }
253              
254             =head2 pass
255              
256             Password for SASL authentication.
257              
258             =cut
259              
260 0     0 1   sub pass { shift->{pass} }
261              
262             =head2 hostname
263              
264             Name of the host
265              
266             =cut
267              
268 0     0 1   sub hostname { shift->{hostname} }
269              
270             =head2 resource
271              
272             Fragment used to differentiate this client from any other active clients for this user (as defined by bare JID).
273              
274             =cut
275              
276 0     0 1   sub resource { shift->{resource} }
277              
278             =head2 write_xml
279              
280             Write a chunk of XML to the stream, converting from the internal representation to XML
281             text stanzas.
282              
283             =cut
284              
285             sub write_xml {
286 0     0 1   my $self = shift;
287 0           $self->queue_write($self->_ref_to_xml(@_));
288             }
289              
290             =head2 write_text
291              
292             Write raw text to the output stream.
293              
294             =cut
295              
296             sub write_text {
297 0     0 1   my $self = shift;
298 0           $self->queue_write($_) for @_;
299             }
300              
301             =head2 login
302              
303             Process the login.
304              
305             Takes optional named parameters:
306              
307             =over 4
308              
309             =item * user - username (not the full JID, just the user part)
310              
311             =item * password - password or passphrase to use in SASL authentication
312              
313             =back
314              
315             =cut
316              
317             sub login {
318 0     0 1   my $self = shift;
319 0           my %args = @_;
320              
321 0   0       my $user = delete $args{user} // $self->user;
322 0   0       my $pass = delete $args{password} // $self->pass;
323              
324             my $sasl = Authen::SASL->new(
325             mechanism => $self->{features}->_sasl_mechanism_list,
326             callback => {
327 0     0     pass => sub { $pass },
328 0     0     user => sub { $user },
329             }
330 0           );
331              
332 0           my $s = $sasl->client_new(
333             'xmpp',
334             $self->hostname,
335             0
336             );
337 0           $self->{features}->{sasl_client} = $s;
338 0           my $msg = $s->client_start;
339 0           my $mech = $s->mechanism;
340 0 0 0       if(defined($msg) && length($msg)) {
341 0           $self->debug("Have initial message");
342 0           $msg = MIME::Base64::encode($msg, ''); # no linebreaks
343             }
344              
345 0           my $f = $self->new_future;
346             $self->subscribe_to_event(
347             login_success => sub {
348 0     0     my ($ev) = @_;
349             # We only wanted a one-shot notification here
350 0           $ev->unsubscribe;
351 0           $f->done
352             }
353 0           );
354 0           $self->debug("SASL mechanism: " . $mech);
355 0 0         $self->queue_write(
356             $self->_ref_to_xml(
357             [
358             'auth',
359             '_ns' => 'xmpp-sasl',
360             mechanism => $mech,
361             $msg
362             ? (_content => $msg)
363             : ()
364             ]
365             )
366             );
367 0           return $f;
368             }
369              
370             sub pending_iq {
371 0     0 0   my ($self, $id, $f) = @_;
372 0 0         die "IQ request $id already exists" if exists $self->{pending_iq}{$id};
373 0           $self->{pending_iq}{$id} = $f;
374 0           $self
375             }
376              
377             sub iq_complete {
378 0     0 0   my ($self, $id, $iq) = @_;
379 0 0         die "IQ request $id not found" unless exists $self->{pending_iq}{$id};
380 0           $self->{pending_iq}{$id}->done($iq);
381 0           $self
382             }
383              
384             =head2 is_authorised
385              
386             Returns true if we are authorised already.
387              
388             =cut
389              
390             sub is_authorised {
391 0     0 1   my $self = shift;
392 0 0         if(@_) {
393 0           my $state = shift;
394 0           $self->{authorised} = $state;
395 0 0         $self->dispatch_event($state ? 'authorised' : 'unauthorised');
396 0 0         if($state) {
397 0           my $f;
398             $f = Future->needs_all(
399             $self->remote_opened,
400             $self->features_complete,
401             )->on_done(sub {
402 0     0     $self->login_complete->done;
403 0           $self->invoke_event(login_success => );
404 0     0     })->on_ready(sub { undef $f });
  0            
405             } else {
406 0           $self->login_complete->fail;
407 0           $self->invoke_event(login_fail => );
408             }
409 0           return $self;
410             }
411 0           return $self->{authorised};
412             }
413              
414             sub login_complete {
415 0     0 0   my $self = shift;
416 0   0       $self->{login_complete} ||= $self->new_future
417             }
418              
419             =head2 is_loggedin
420              
421             Returns true if we are logged in already.
422              
423             =cut
424              
425             sub is_loggedin {
426 0     0 1   my $self = shift;
427 0 0         if(@_) {
428 0           my $state = shift;
429 0           $self->{loggedin} = $state;
430 0 0         $self->dispatch_event($state ? 'login' : 'logout');
431 0           return $self;
432             }
433 0           return $self->{loggedin};
434             }
435              
436             =head2 stream
437              
438             Override the ->stream method from the base class so that we pick up our own methods directly.
439              
440             =cut
441              
442 0     0 1   sub stream { shift }
443              
444             =head2 next_id
445              
446             Returns the next ID in the sequence for outgoing requests.
447              
448             =cut
449              
450             sub next_id {
451 0     0 1   my $self = shift;
452 0 0         unless($self->{request_id}) {
453 0           $self->{request_id} = 'pxa0001';
454             }
455 0           return $self->{request_id}++;
456             }
457              
458             =head2 on_tls_complete
459              
460             Continues the next part of the connection when TLS is complete.
461              
462             =cut
463              
464             sub on_tls_complete {
465 0     0 1   my $self = shift;
466 0           delete $self->{tls_pending};
467 0           $self->reset;
468 0           $self->write_text($_) for @{$self->preamble};
  0            
469             }
470              
471             =head2 compose
472              
473             Compose a new outgoing message.
474              
475             =cut
476              
477             sub compose {
478 0     0 1   my $self = shift;
479 0           my %args = @_;
480 0           return Protocol::XMPP::Message->new(
481             stream => $self,
482             %args
483             );
484             }
485              
486             =head2 subscribe
487              
488             Subscribe to a new contact. Takes a single JID as target.
489              
490             =cut
491              
492             sub subscribe {
493 0     0 1   my $self = shift;
494 0           my $to = shift;
495 0           Protocol::XMPP::Contact->new(
496             stream => $self,
497             jid => $to,
498             )->subscribe;
499             }
500              
501             =head2 unsubscribe
502              
503             Unsubscribe from the given contact. Takes a single JID as target.
504             =cut
505              
506             sub unsubscribe {
507 0     0 1   my $self = shift;
508 0           my $to = shift;
509 0           Protocol::XMPP::Contact->new(
510             stream => $self,
511             jid => $to,
512             )->unsubscribe;
513             }
514              
515             =head2 authorise
516              
517             Grant authorisation to the given contact. Takes a single JID as target.
518              
519             =cut
520              
521             sub authorise {
522 0     0 1   my $self = shift;
523 0           my $to = shift;
524 0           Protocol::XMPP::Contact->new(
525             stream => $self,
526             jid => $to,
527             )->authorise;
528             }
529              
530             =head2 deauthorise
531              
532             Revokes auth for the given contact. Takes a single JID as target.
533              
534             =cut
535              
536             sub deauthorise {
537 0     0 1   my $self = shift;
538 0           my $to = shift;
539 0           Protocol::XMPP::Contact->new(
540             stream => $self,
541             jid => $to,
542             )->deauthorise;
543             }
544              
545             sub presence {
546 0     0 1   my ($self, %args) = @_;
547             $self->write_xml([
548             'presence',
549             (exists $args{to}
550             ? (to => $args{to})
551             : ()
552             ),
553             _content => [
554             #[
555             # 'show', _content => $args{show} || 'available',
556             #],
557             [
558             'status', _content => $args{status}
559 0 0         ]]
560             ]);
561             }
562              
563             sub remote_opened {
564 0     0 0   my $self = shift;
565 0   0       $self->{remote_opened} ||= $self->new_future
566             }
567              
568             sub remote_closed {
569 0     0 0   my $self = shift;
570 0   0       $self->{remote_closed} ||= $self->new_future
571             }
572              
573             sub close {
574 0     0 0   my $self = shift;
575             $self->remote_opened->then(sub {
576 0     0     $self->queue_write(
577             ''
578             )
579             })->then(sub {
580 0     0     $self->remote_closed
581 0           });
582             }
583              
584             1;
585              
586             __END__