File Coverage

blib/lib/Net/SIP/Simple/RTP.pm
Criterion Covered Total %
statement 176 217 81.1
branch 61 106 57.5
condition 33 61 54.1
subroutine 15 15 100.0
pod 2 2 100.0
total 287 401 71.5


line stmt bran cond sub pod time code
1             ###########################################################################
2             # Net::SIP::Simple::RTP
3             # implements some RTP behaviors
4             # - media_recv_echo: receive and echo data with optional delay back
5             # can save received data
6             # - media_send_recv: receive and optionally save data. Sends back data
7             # from file with optional repeat count
8             ###########################################################################
9              
10 43     43   241 use strict;
  43         85  
  43         1053  
11 43     43   170 use warnings;
  43         73  
  43         1323  
12              
13             package Net::SIP::Simple::RTP;
14              
15 43     43   213 use Net::SIP::Util qw(invoke_callback ip_sockaddr2parts ip_parts2string);
  43         69  
  43         2007  
16 43     43   206 use Socket;
  43         67  
  43         19849  
17 43     43   278 use Net::SIP::Debug;
  43         76  
  43         218  
18 43     43   16844 use Net::SIP::DTMF;
  43         120  
  43         2460  
19 43     43   283 use Net::SIP::Dispatcher::Eventloop;
  43         85  
  43         2250  
20              
21              
22             # on MSWin32 non-blocking sockets are not supported from IO::Socket
23 43     43   242 use constant CAN_NONBLOCKING => $^O ne 'MSWin32';
  43         79  
  43         94002  
24              
25             ###########################################################################
26             # creates function which will initialize Media for echo back
27             # Args: ($writeto,$delay)
28             # $delay: how much packets delay between receive and echo back (default 0)
29             # if <0 no ddata will be send back (e.g. recv only)
30             # $writeto: where to save received data (default: don't save)
31             # Returns: [ \&sub,@args ]
32             ###########################################################################
33             sub media_recv_echo {
34 57     57 1 187 my ($writeto,$delay) = @_;
35              
36             my $sub = sub {
37 32     32   103 my ($delay,$writeto,$call,$args) = @_;
38              
39 32         91 my $lsocks = $args->{media_lsocks};
40 32   33     332 my $ssocks = $args->{media_ssocks} || $lsocks;
41 32         262 my $raddr = $args->{media_raddr};
42 32         142 my $mdtmf = $args->{media_dtmfxtract};
43 32         82 my $didit = 0;
44 32         160 for( my $i=0;$i<@$lsocks;$i++ ) {
45 32   50     133 my $sock = $lsocks->[$i] || next;
46 32 50       163 $sock = $sock->[0] if UNIVERSAL::isa( $sock,'ARRAY' );
47 32         69 my $s_sock = $ssocks->[$i];
48 32 50       180 $s_sock = $s_sock->[0] if UNIVERSAL::isa( $s_sock,'ARRAY' );
49              
50 32         74 my $addr = $raddr->[$i];
51 32 100       155 $addr = $addr->[0] if ref($addr);
52              
53 32         76 my @delay_buffer;
54 32         56 my $channel = $i;
55             my $echo_back = sub {
56 1700         7739 my ($s_sock,$remote,$delay_buffer,$delay,$writeto,$targs,$didit,$sock) = @_;
57             {
58 1700 100       4159 my ($buf,$mpt,$seq,$tstamp,$ssrc,$csrc) =
  3452         13627  
59             _receive_rtp($sock,$writeto,$targs,$didit,$channel)
60             or last;
61             #DEBUG( "$didit=$$didit" );
62 1902         5446 $$didit = 1;
63              
64 1902 100 66     10223 last if ! $s_sock || ! $remote; # call on hold ?
65              
66 1852         7380 my @pkt = _generate_dtmf($targs,$seq,$tstamp,0x1234);
67 1852 50 33     7253 if (@pkt && $pkt[0] ne '') {
68 0         0 DEBUG( 100,"send DTMF to RTP");
69 0         0 send( $s_sock,$_,0,$remote ) for(@pkt);
70 0         0 return; # send DTMF *instead* of echo data
71             }
72              
73 1852 100       5826 last if $delay<0;
74 1752         4435 push @$delay_buffer, $buf;
75 1752         5047 while ( @$delay_buffer > $delay ) {
76 1752         155008 send( $s_sock,shift(@$delay_buffer),0,$remote );
77             }
78 1752         7487 CAN_NONBLOCKING && redo; # try recv again
79             }
80 32         608 };
81              
82             $call->{loop}->addFD($sock, EV_READ,
83             [ $echo_back,$s_sock,$addr,\@delay_buffer,$delay || 0,$writeto,{
84             dtmf_gen => $args->{dtmf_events},
85             dtmf_xtract => $mdtmf && $mdtmf->[$i] && $args->{cb_dtmf}
86 32   100     1047 && [ $mdtmf->[$i], $args->{cb_dtmf} ],
      50        
87             },\$didit ],
88             'rtp_echo_back' );
89 32         212 my $reset_to_blocking = CAN_NONBLOCKING && $s_sock->blocking(0);
90 32         505 push @{ $call->{ rtp_cleanup }}, [ sub {
91 32         91 my ($call,$sock,$rb) = @_;
92 32         192 DEBUG( 100,"rtp_cleanup: remove socket %d",fileno($sock));
93 32         218 $call->{loop}->delFD( $sock );
94 32 50       283 $sock->blocking(1) if $rb;
95 32         636 }, $call,$sock,$reset_to_blocking ];
96             }
97              
98             # on RTP inactivity for at least 10 seconds close connection
99             my $timer = $call->{dispatcher}->add_timer( 10,
100             [ sub {
101 0         0 my ($call,$didit,$timer) = @_;
102 0 0       0 if ( $$didit ) {
103 0         0 $$didit = 0;
104             } else {
105 0         0 DEBUG(10, "closing call because if inactivity" );
106 0         0 $call->bye;
107 0         0 $timer->cancel;
108             }
109 32         546 }, $call,\$didit ],
110             10,
111             'rtp_inactivity',
112             );
113 32         845 push @{ $call->{ rtp_cleanup }}, [
114             sub {
115 32         191 shift->cancel;
116 32         103 DEBUG( 100,"cancel RTP timer" );
117             },
118 32         81 $timer
119             ];
120 57         1547 };
121              
122 57         569 return [ $sub,$delay,$writeto ];
123             }
124              
125             ###########################################################################
126             # creates function which will initialize Media for saving received data
127             # into file and sending data from another file
128             # Args: ($readfrom;$repeat,$writeto)
129             # $readfrom: where to read data for sending from (filename or callback
130             # which returns payload)
131             # $repeat: if <= 0 the data in $readfrom will be send again and again
132             # if >0 the data in $readfrom will be send $repeat times
133             # $writeto: where to save received data (undef == don't save), either
134             # filename or callback which gets packet as argument
135             # Returns: [ \&sub,@args ]
136             ###########################################################################
137             sub media_send_recv {
138 16     16 1 60 my ($readfrom,$repeat,$writeto) = @_;
139              
140             my $sub = sub {
141 16     16   60 my ($writeto,$readfrom,$repeat,$call,$args) = @_;
142              
143 16         65 my $lsocks = $args->{media_lsocks};
144 16   33     144 my $ssocks = $args->{media_ssocks} || $lsocks;
145 16         46 my $raddr = $args->{media_raddr};
146 16         74 my $mdtmf = $args->{media_dtmfxtract};
147 16         42 my $didit = 0;
148 16         75 for( my $i=0;$i<@$lsocks;$i++ ) {
149 16         35 my $channel = $i;
150 16         38 my $sock = $lsocks->[$i];
151 16         38 my ($timer,$reset_to_blocking);
152              
153             # recv once I get an event on RTP socket
154 16 50       64 if ($sock) {
155 16 50       76 $sock = $sock->[0] if UNIVERSAL::isa( $sock,'ARRAY' );
156             my $receive = sub {
157 3175         11137 my ($writeto,$targs,$didit,$sock) = @_;
158 3175         6869 while (1) {
159 6629         20132 my $buf = _receive_rtp($sock,$writeto,$targs,$didit,$channel);
160 6629 100       30497 defined($buf) or return;
161 3454         6029 CAN_NONBLOCKING or return;
162             }
163 16         277 };
164             $call->{loop}->addFD($sock, EV_READ,
165             [
166             $receive,
167             $writeto,
168             {
169             dtmf_gen => $args->{dtmf_events},
170             dtmf_xtract => $mdtmf && $mdtmf->[$i] && $args->{cb_dtmf}
171 16   50     576 && [ $mdtmf->[$i], $args->{cb_dtmf} ],
172             },
173             \$didit
174             ],
175             'rtp_receive'
176             );
177 16         142 $reset_to_blocking = CAN_NONBLOCKING && $sock->blocking(0);
178             }
179              
180             # sending need to be done with a timer
181             # ! $addr == call on hold
182 16         426 my $addr = $raddr->[$i];
183 16 50       87 $addr = $addr->[0] if ref($addr);
184 16 50 33     951 if ($addr and my $s_sock = $ssocks->[$i]) {
185 16 50       120 $s_sock = $s_sock->[0] if UNIVERSAL::isa( $s_sock,'ARRAY' );
186 16   100     141 my $cb_done = $args->{cb_rtp_done} || sub { shift->bye };
187             $timer = $call->{dispatcher}->add_timer(
188             0, # start immediately
189             [ \&_send_rtp,$s_sock,$call->{loop},$addr,$readfrom,$channel, {
190             repeat => $repeat || 1,
191 12         55 cb_done => [ sub { invoke_callback(@_) }, $cb_done, $call ],
192             rtp_param => $args->{rtp_param},
193             dtmf_gen => $args->{dtmf_events},
194             dtmf_xtract => $mdtmf && $mdtmf->[$i] && $args->{cb_dtmf}
195             && [ $mdtmf->[$i], $args->{cb_dtmf} ],
196             }],
197 16   100     755 $args->{rtp_param}[2], # repeat timer
      50        
198             'rtpsend',
199             );
200             }
201              
202 16         252 push @{ $call->{rtp_cleanup}}, [ sub {
203 16         55 my ($call,$sock,$timer,$rb) = @_;
204 16 50       78 if ($sock) {
205 16         106 $call->{loop}->delFD($sock);
206 16 50       116 $sock->blocking(1) if $rb;
207             }
208 16 50       380 $timer->cancel() if $timer;
209 16         37 }, $call,$sock,$timer,$reset_to_blocking ];
210             }
211              
212             # on RTP inactivity for at least 10 seconds close connection
213             my $timer = $call->{dispatcher}->add_timer( 10,
214             [ sub {
215 0         0 my ($call,$args,$didit,$timer) = @_;
216 0 0       0 if ( $$didit ) {
217 0         0 $$didit = 0;
218             } else {
219 0         0 DEBUG( 10,"closing call because if inactivity" );
220 0         0 $call->bye;
221 0         0 $timer->cancel;
222             }
223 16         300 }, $call,$args,\$didit ],
224             10,
225             'rtp_inactivity',
226             );
227 16         34 push @{ $call->{ rtp_cleanup }}, [ sub { shift->cancel }, $timer ];
  16         1126  
  16         53  
228 16         425 };
229              
230 16         224 return [ $sub,$writeto,$readfrom,$repeat ];
231             }
232              
233             ###########################################################################
234             # Helper to receive RTP and optionally save it to file
235             # Args: ($sock,$writeto,$targs,$didit,$channel)
236             # $sock: RTP socket
237             # $writeto: filename for saving or callback which gets packet as argument
238             # $targs: \%hash to hold state info between calls of this function
239             # $didit: reference to scalar which gets set to TRUE on each received packet
240             # and which gets set to FALSE from a timer, thus detecting inactivity
241             # $channel: index of RTP channel
242             # Return: $packet
243             # $packet: received RTP packet (including header)
244             ###########################################################################
245             sub _receive_rtp {
246 10081     10081   29725 my ($sock,$writeto,$targs,$didit,$channel) = @_;
247              
248 10081         134372 my $from = recv( $sock,my $buf,2**16,0 );
249 10081 100 66     82772 return if ! $from || !defined($buf) || $buf eq '';
      66        
250 5356         32935 DEBUG( 50,"received %d bytes from RTP", length($buf));
251              
252 5356         13932 if(0) {
253             DEBUG( "got data on socket %d %s from %s",fileno($sock),
254             ip_sockaddr2string(getsockname($sock)),
255             ip_sockaddr2string($from));
256             }
257              
258 5356         14655 $$didit = 1;
259 5356         15851 my $packet = $buf;
260              
261 5356         39108 my ($vpxcc,$mpt,$seq,$tstamp,$ssrc) = unpack( 'CCnNN',substr( $buf,0,12,'' ));
262 5356         16586 my $version = ($vpxcc & 0xc0) >> 6;
263 5356 50       15478 if ( $version != 2 ) {
264 0         0 DEBUG( 100,"RTP version $version" );
265             return
266 0         0 }
267             # skip csrc headers
268 5356         12493 my $cc = $vpxcc & 0x0f;
269 5356   33     14819 my $csrc = $cc && substr( $buf,0,4*$cc,'' );
270              
271             # skip extension header
272 5356 50       15779 my $xh = $vpxcc & 0x10 ? (unpack( 'nn', substr( $buf,0,4,'' )))[1] : 0;
273 5356 50       12385 substr( $buf,0,4*$xh,'' ) if $xh;
274              
275             # ignore padding
276 5356 50       11003 my $padding = $vpxcc & 0x20 ? unpack( 'C', substr($buf,-1,1)) : 0;
277 5356 50       14776 my $payload = $padding ? substr( $buf,0,length($buf)-$padding ): $buf;
278              
279 5356         25843 DEBUG( 100,"ch=%d payload=%d/%d pt=%d xh=%d padding=%d cc=%d",
280             $channel, $seq, length($payload), $mpt & 0x7f, $xh, $padding, $cc);
281 5356 50 66     35998 if ( $targs->{rseq} && $seq<= $targs->{rseq}
      33        
282             && $targs->{rseq} - $seq < 60000 ) {
283 0         0 DEBUG( 10,"seq=$seq last=$targs->{rseq} - dropped" );
284 0         0 return;
285             }
286 5356         12964 $targs->{rseq} = $seq;
287              
288 5356 100       20425 if ( ref($writeto)) {
    50          
289             # callback
290 1951         9576 invoke_callback($writeto,$payload,$seq,$tstamp,$channel,$mpt & 0x7f);
291             } elsif ( $writeto ) {
292             # save into file
293 0         0 my $fd = $targs->{fdr};
294 0 0       0 if ( !$fd ) {
295 0 0       0 open( $fd,'>',$writeto ) || die $!;
296 0         0 $targs->{fdr} = $fd
297             }
298 0         0 syswrite($fd,$payload);
299             }
300              
301 5356 100       41723 if ( my $xt = $targs->{dtmf_xtract} ) {
302 4207         11511 my ($sub,$cb) = @$xt;
303 4207 100       16538 if ( my ($event,$duration) = $sub->($packet)) {
304 72         495 DEBUG(40,"received dtmf <$event,$duration>");
305 72         350 $cb->($event,$duration);
306             }
307             }
308              
309 5356 100       31595 return wantarray ? ( $packet,$mpt,$seq,$tstamp,$ssrc,$csrc ): $packet;
310             }
311              
312             ###########################################################################
313             # Helper to read RTP data from file (PCMU 8000) and send them through
314             # the RTP socket
315             # Args: ($sock,$loop,$addr,$readfrom,$targs,$timer)
316             # $sock: RTP socket
317             # $loop: event loop (used for looptime for timestamp)
318             # $addr: where to send data
319             # $readfrom: filename for reading or callback which will return payload
320             # $channel: index of RTP channel
321             # $targs: \%hash to hold state info between calls of this function
322             # especially 'repeat' holds the number of times this data has to be
323             # send (<=0 means forever) and 'cb_done' holds a [\&sub,@arg] callback
324             # to end the call after sending all data
325             # 'repeat' makes only sense if $readfrom is filename
326             # $timer: timer which gets canceled once all data are send
327             # Return: NONE
328             ###########################################################################
329             sub _send_rtp {
330 3617     3617   12764 my ($sock,$loop,$addr,$readfrom,$channel,$targs,$timer) = @_;
331              
332 3617         9892 $targs->{wseq}++;
333 3617         7578 my $seq = $targs->{wseq};
334              
335             # 32 bit timestamp based on seq and packet size
336 3617         15795 my $timestamp = ( $targs->{rtp_param}[1] * $seq ) % 2**32;
337              
338 3617         14543 my @pkt = _generate_dtmf($targs,$seq,$timestamp,0x1234);
339 3617 100 100     17981 if (@pkt && $pkt[0] ne '') {
340 1305         5659 DEBUG( 100,"send DTMF to RTP");
341 1305         349103 send( $sock,$_,0,$addr ) for(@pkt);
342 1305         10616 return;
343             }
344              
345 2312         8785 my $buf;
346             my $rtp_event;
347 2312         0 my $payload_type;
348              
349 2312 50       8475 if ( ref($readfrom) ) {
350             # payload by callback
351 2312         8931 $buf = invoke_callback($readfrom,$seq,$channel);
352 2312 100       44591 if ( !$buf ) {
353 12         92 DEBUG( 50, "no more data from callback" );
354 12 50       78 $timer && $timer->cancel;
355 12         41 invoke_callback( $targs->{cb_done} );
356 12         101 return;
357             }
358 2300 50       6800 ($buf,$payload_type,$rtp_event,$timestamp) = @$buf if ref($buf);
359             } else {
360             # read from file
361 0         0 for(my $tries = 0; $tries<2;$tries++ ) {
362 0   0     0 $targs->{wseq} ||= int( rand( 2**16 ));
363 0         0 my $fd = $targs->{fd};
364 0 0       0 if ( !$fd ) {
365 0 0       0 $targs->{repeat} = -1 if $targs->{repeat} < 0;
366 0 0       0 if ( $targs->{repeat} == 0 ) {
367             # no more sending
368 0         0 DEBUG( 50, "no more data from file" );
369 0 0       0 $timer && $timer->cancel;
370 0         0 invoke_callback( $targs->{cb_done} );
371 0         0 return;
372             }
373              
374 0 0       0 open( $fd,'<',$readfrom ) || die $!;
375 0         0 $targs->{fd} = $fd;
376             }
377 0         0 my $size = $targs->{rtp_param}[1]; # 160 for PCMU/8000
378 0 0       0 last if read( $fd,$buf,$size ) == $size;
379             # try to reopen file
380 0         0 close($fd);
381 0         0 $targs->{fd} = undef;
382 0         0 $targs->{repeat}--;
383             }
384             }
385              
386 2300 50 33     14873 die $! if ! defined $buf or $buf eq '';
387 2300         4632 if (0) {
388             DEBUG(50, "%s -> %s seq=%d ts=%x",
389             ip_sockaddr2string(getsockname($sock)),
390             ip_sockaddr2string($addr),
391             $seq, $timestamp
392             );
393             }
394              
395             # add RTP header
396 2300 50       7203 $rtp_event = 0 if ! defined $rtp_event;
397 2300 50 50     12788 $payload_type = $targs->{rtp_param}[0]||0 # 0 == PMCU 8000
398             if ! defined $payload_type;
399              
400 2300         17050 my $header = pack('CCnNN',
401             0b10000000, # Version 2
402             $payload_type | ( $rtp_event << 7 ) ,
403             $seq, # sequence
404             $timestamp,
405             0x1234, # source ID
406             );
407 2300         10212 DEBUG( 100,"send %d bytes to RTP", length($buf));
408 2300         548823 send( $sock,$header.$buf,0,$addr );
409             }
410              
411             ###########################################################################
412             # Helper to send DTMF
413             # Args: ($targs,$seq,$timestamp,$srcid)
414             # $targs: hash which is shared with _send_rtp and other callbacks, contains
415             # dtmf array with events
416             # $seq,$timestamp,$srcid: parameter for RTP packet
417             # Returns: @pkt
418             # (): no DTMF events to handle
419             # $pkt[0] eq '': DTMF in process, but no data
420             # @pkt: RTP packets to send
421             ###########################################################################
422             sub _generate_dtmf {
423 5469     5469   16311 my ($targs,$seq,$timestamp,$srcid) = @_;
424 5469         14254 my $dtmfs = $targs->{dtmf_gen};
425 5469 100 66     31394 $dtmfs and @$dtmfs or return;
426              
427 1413         4715 while ( @$dtmfs ) {
428 1497         3053 my $dtmf = $dtmfs->[0];
429 1497 100       5019 if ( my $duration = $dtmf->{duration} ) {
430             DEBUG(40,"generate dtmf ".(
431             $dtmf->{sub} ? '' :
432 1485 100       8523 defined $dtmf->{event} ? "<$dtmf->{event},$duration>" :
    100          
433             ""
434             ));
435             my $cb = $dtmf->{sub}
436 1485   66     5700 ||= dtmf_generator($dtmf->{event},$duration,%$dtmf);
437 1485         6660 my @pkt = $cb->($seq,$timestamp,$srcid);
438 1485 100       8832 return @pkt if @pkt;
439             }
440 96         222 shift(@$dtmfs);
441 96 100       1388 if ( my $cb = $dtmf->{cb_final} ) {
442 12         57 invoke_callback($cb,'OK');
443             }
444             }
445 12         38 return;
446             }
447              
448             1;