File Coverage

blib/lib/Net/SIP/Simple/RTP.pm
Criterion Covered Total %
statement 177 219 80.8
branch 61 108 56.4
condition 32 64 50.0
subroutine 15 15 100.0
pod 2 2 100.0
total 287 408 70.3


line stmt bran cond sub pod time code
1             ###########################################################################
2             # Net::SIP::Simple::RTP
3             # implements some RTP behaviors
4             # - media_recv_echo: receive and echo data with optional delay back
5             # can save received data
6             # - media_send_recv: receive and optionally save data. Sends back data
7             # from file with optional repeat count
8             ###########################################################################
9              
10 41     41   287 use strict;
  41         84  
  41         1179  
11 41     41   215 use warnings;
  41         83  
  41         1595  
12              
13             package Net::SIP::Simple::RTP;
14              
15 41     41   232 use Net::SIP::Util qw(invoke_callback ip_sockaddr2parts ip_parts2string);
  41         67  
  41         2547  
16 41     41   308 use Socket;
  41         89  
  41         21938  
17 41     41   305 use Net::SIP::Debug;
  41         79  
  41         267  
18 41     41   18808 use Net::SIP::DTMF;
  41         136  
  41         2864  
19 41     41   322 use Net::SIP::Dispatcher::Eventloop;
  41         99  
  41         2558  
20              
21              
22             # on MSWin32 non-blocking sockets are not supported from IO::Socket
23 41     41   259 use constant CAN_NONBLOCKING => $^O ne 'MSWin32';
  41         85  
  41         106683  
24              
25             ###########################################################################
26             # creates function which will initialize Media for echo back
27             # Args: ($writeto,$delay)
28             # $delay: how much packets delay between receive and echo back (default 0)
29             # if <0 no ddata will be send back (e.g. recv only)
30             # $writeto: where to save received data (default: don't save)
31             # Returns: [ \&sub,@args ]
32             ###########################################################################
33             sub media_recv_echo {
34 53     53 1 215 my ($writeto,$delay) = @_;
35              
36             my $sub = sub {
37 30     30   102 my ($delay,$writeto,$call,$args) = @_;
38              
39 30         88 my $lsocks = $args->{media_lsocks};
40 30   33     505 my $ssocks = $args->{media_ssocks} || $lsocks;
41 30         131 my $raddr = $args->{media_raddr};
42 30         66 my $mdtmf = $args->{media_dtmfxtract};
43 30         68 my $didit = 0;
44 30         188 for( my $i=0;$i<@$lsocks;$i++ ) {
45 30   50     133 my $sock = $lsocks->[$i] || next;
46 30 50       211 $sock = $sock->[0] if UNIVERSAL::isa( $sock,'ARRAY' );
47 30         76 my $s_sock = $ssocks->[$i];
48 30 50       180 $s_sock = $s_sock->[0] if UNIVERSAL::isa( $s_sock,'ARRAY' );
49              
50 30         80 my $addr = $raddr->[$i];
51 30 100       177 $addr = $addr->[0] if ref($addr);
52              
53 30         62 my @delay_buffer;
54 30         73 my $channel = $i;
55             my $echo_back = sub {
56 1553         6250 my ($s_sock,$remote,$delay_buffer,$delay,$writeto,$targs,$didit,$sock) = @_;
57             {
58 1553 100       3131 my ($buf,$mpt,$seq,$tstamp,$ssrc,$csrc) =
  3304         11853  
59             _receive_rtp($sock,$writeto,$targs,$didit,$channel)
60             or last;
61             #DEBUG( "$didit=$$didit" );
62 1801         5852 $$didit = 1;
63              
64 1801 100 66     9130 last if ! $s_sock || ! $remote; # call on hold ?
65              
66 1751         6396 my @pkt = _generate_dtmf($targs,$seq,$tstamp,0x1234);
67 1751 50 33     5174 if (@pkt && $pkt[0] ne '') {
68 0         0 DEBUG( 100,"send DTMF to RTP");
69 0         0 send( $s_sock,$_,0,$remote ) for(@pkt);
70 0         0 return; # send DTMF *instead* of echo data
71             }
72              
73 1751 50       4755 last if $delay<0;
74 1751         4651 push @$delay_buffer, $buf;
75 1751         5110 while ( @$delay_buffer > $delay ) {
76 1751         118517 send( $s_sock,shift(@$delay_buffer),0,$remote );
77             }
78 1751         7661 CAN_NONBLOCKING && redo; # try recv again
79             }
80 30         660 };
81              
82             $call->{loop}->addFD($sock, EV_READ,
83             [ $echo_back,$s_sock,$addr,\@delay_buffer,$delay || 0,$writeto,{
84             dtmf_gen => $args->{dtmf_events},
85             dtmf_xtract => $mdtmf && $mdtmf->[$i] && $args->{cb_dtmf}
86 30   50     1149 && [ $mdtmf->[$i], $args->{cb_dtmf} ],
      50        
87             },\$didit ],
88             'rtp_echo_back' );
89 30         237 my $reset_to_blocking = CAN_NONBLOCKING && $s_sock->blocking(0);
90 30         619 push @{ $call->{ rtp_cleanup }}, [ sub {
91 30         102 my ($call,$sock,$rb) = @_;
92 30         204 DEBUG( 100,"rtp_cleanup: remove socket %d",fileno($sock));
93 30         234 $call->{loop}->delFD( $sock );
94 30 50       334 $sock->blocking(1) if $rb;
95 30         680 }, $call,$sock,$reset_to_blocking ];
96             }
97              
98             # on RTP inactivity for at least 10 seconds close connection
99             my $timer = $call->{dispatcher}->add_timer( 10,
100             [ sub {
101 0         0 my ($call,$didit,$timer) = @_;
102 0 0       0 if ( $$didit ) {
103 0         0 $$didit = 0;
104             } else {
105 0         0 DEBUG(10, "closing call because if inactivity" );
106 0         0 $call->bye;
107 0         0 $timer->cancel;
108             }
109 30         705 }, $call,\$didit ],
110             10,
111             'rtp_inactivity',
112             );
113 30         759 push @{ $call->{ rtp_cleanup }}, [
114             sub {
115 30         223 shift->cancel;
116 30         124 DEBUG( 100,"cancel RTP timer" );
117             },
118 30         103 $timer
119             ];
120 53         1999 };
121              
122 53         542 return [ $sub,$delay,$writeto ];
123             }
124              
125             ###########################################################################
126             # creates function which will initialize Media for saving received data
127             # into file and sending data from another file
128             # Args: ($readfrom;$repeat,$writeto)
129             # $readfrom: where to read data for sending from (filename or callback
130             # which returns payload)
131             # $repeat: if <= 0 the data in $readfrom will be send again and again
132             # if >0 the data in $readfrom will be send $repeat times
133             # $writeto: where to save received data (undef == don't save), either
134             # filename or callback which gets packet as argument
135             # Returns: [ \&sub,@args ]
136             ###########################################################################
137             sub media_send_recv {
138 14     14 1 90 my ($readfrom,$repeat,$writeto) = @_;
139              
140             my $sub = sub {
141 14     14   47 my ($writeto,$readfrom,$repeat,$call,$args) = @_;
142              
143 14         49 my $lsocks = $args->{media_lsocks};
144 14   33     234 my $ssocks = $args->{media_ssocks} || $lsocks;
145 14         49 my $raddr = $args->{media_raddr};
146 14         31 my $mdtmf = $args->{media_dtmfxtract};
147 14         30 my $didit = 0;
148 14         184 for( my $i=0;$i<@$lsocks;$i++ ) {
149 14         57 my $channel = $i;
150 14         49 my $sock = $lsocks->[$i];
151 14         30 my ($timer,$reset_to_blocking);
152              
153             # recv once I get an event on RTP socket
154 14 50       59 if ($sock) {
155 14 50       78 $sock = $sock->[0] if UNIVERSAL::isa( $sock,'ARRAY' );
156             my $receive = sub {
157 2924         9343 my ($writeto,$targs,$didit,$sock) = @_;
158 2924         5152 while (1) {
159 6331         18715 my $buf = _receive_rtp($sock,$writeto,$targs,$didit,$channel);
160 6331 100       30872 defined($buf) or return;
161 3407         5927 CAN_NONBLOCKING or return;
162             }
163 14         245 };
164             $call->{loop}->addFD($sock, EV_READ,
165             [
166             $receive,
167             $writeto,
168             {
169             dtmf_gen => $args->{dtmf_events},
170             dtmf_xtract => $mdtmf && $mdtmf->[$i] && $args->{cb_dtmf}
171 14   50     710 && [ $mdtmf->[$i], $args->{cb_dtmf} ],
172             },
173             \$didit
174             ],
175             'rtp_receive'
176             );
177 14         109 $reset_to_blocking = CAN_NONBLOCKING && $sock->blocking(0);
178             }
179              
180             # sending need to be done with a timer
181             # ! $addr == call on hold
182 14         399 my $addr = $raddr->[$i];
183 14 50       922 $addr = $addr->[0] if ref($addr);
184 14 50 33     250 if ($addr and my $s_sock = $ssocks->[$i]) {
185 14 50       102 $s_sock = $s_sock->[0] if UNIVERSAL::isa( $s_sock,'ARRAY' );
186 14   100     89 my $cb_done = $args->{cb_rtp_done} || sub { shift->bye };
187             $timer = $call->{dispatcher}->add_timer(
188             0, # start immediately
189             [ \&_send_rtp,$s_sock,$call->{loop},$addr,$readfrom,$channel, {
190             repeat => $repeat || 1,
191 12         68 cb_done => [ sub { invoke_callback(@_) }, $cb_done, $call ],
192             rtp_param => $args->{rtp_param},
193             dtmf_gen => $args->{dtmf_events},
194             dtmf_xtract => $mdtmf && $mdtmf->[$i] && $args->{cb_dtmf}
195             && [ $mdtmf->[$i], $args->{cb_dtmf} ],
196             }],
197 14   50     768 $args->{rtp_param}[2], # repeat timer
      50        
198             'rtpsend',
199             );
200             }
201              
202 14         330 push @{ $call->{rtp_cleanup}}, [ sub {
203 14         50 my ($call,$sock,$timer,$rb) = @_;
204 14 50       59 if ($sock) {
205 14         95 $call->{loop}->delFD($sock);
206 14 50       112 $sock->blocking(1) if $rb;
207             }
208 14 50       364 $timer->cancel() if $timer;
209 14         32 }, $call,$sock,$timer,$reset_to_blocking ];
210             }
211              
212             # on RTP inactivity for at least 10 seconds close connection
213             my $timer = $call->{dispatcher}->add_timer( 10,
214             [ sub {
215 0         0 my ($call,$args,$didit,$timer) = @_;
216 0 0       0 if ( $$didit ) {
217 0         0 $$didit = 0;
218             } else {
219 0         0 DEBUG( 10,"closing call because if inactivity" );
220 0         0 $call->bye;
221 0         0 $timer->cancel;
222             }
223 14         330 }, $call,$args,\$didit ],
224             10,
225             'rtp_inactivity',
226             );
227 14         495 push @{ $call->{ rtp_cleanup }}, [ sub { shift->cancel }, $timer ];
  14         216  
  14         57  
228 14         586 };
229              
230 14         267 return [ $sub,$writeto,$readfrom,$repeat ];
231             }
232              
233             ###########################################################################
234             # Helper to receive RTP and optionally save it to file
235             # Args: ($sock,$writeto,$targs,$didit,$channel)
236             # $sock: RTP socket
237             # $writeto: filename for saving or callback which gets packet as argument
238             # $targs: \%hash to hold state info between calls of this function
239             # $didit: reference to scalar which gets set to TRUE on each received packet
240             # and which gets set to FALSE from a timer, thus detecting inactivity
241             # $channel: index of RTP channel
242             # Return: $packet
243             # $packet: received RTP packet (including header)
244             ###########################################################################
245             sub _receive_rtp {
246 9635     9635   25609 my ($sock,$writeto,$targs,$didit,$channel) = @_;
247              
248 9635         136386 my $from = recv( $sock,my $buf,2**16,0 );
249 9635 100 66     82555 return if ! $from || !defined($buf) || $buf eq '';
      66        
250 5208         28348 DEBUG( 50,"received %d bytes from RTP", length($buf));
251              
252 5208         12782 if(0) {
253             DEBUG( "got data on socket %d %s from %s",fileno($sock),
254             ip_sockaddr2string(getsockname($sock)),
255             ip_sockaddr2string($from));
256             }
257              
258 5208         10544 $$didit = 1;
259 5208         13777 my $packet = $buf;
260              
261 5208         36758 my ($vpxcc,$mpt,$seq,$tstamp,$ssrc) = unpack( 'CCnNN',substr( $buf,0,12,'' ));
262 5208         16203 my $version = ($vpxcc & 0xc0) >> 6;
263 5208 50       14029 if ( $version != 2 ) {
264 0         0 DEBUG( 100,"RTP version $version" );
265             return
266 0         0 }
267             # skip csrc headers
268 5208         9773 my $cc = $vpxcc & 0x0f;
269 5208   33     12832 my $csrc = $cc && substr( $buf,0,4*$cc,'' );
270              
271             # skip extension header
272 5208 50       13364 my $xh = $vpxcc & 0x10 ? (unpack( 'nn', substr( $buf,0,4,'' )))[1] : 0;
273 5208 50       13649 substr( $buf,0,4*$xh,'' ) if $xh;
274              
275             # ignore padding
276 5208 50       10820 my $padding = $vpxcc & 0x20 ? unpack( 'C', substr($buf,-1,1)) : 0;
277 5208 50       14721 my $payload = $padding ? substr( $buf,0,length($buf)-$padding ): $buf;
278              
279 5208         24056 DEBUG( 100,"ch=%d payload=%d/%d pt=%d xh=%d padding=%d cc=%d",
280             $channel, $seq, length($payload), $mpt & 0x7f, $xh, $padding, $cc);
281 5208 50 33     22096 if ( $targs->{ssrc} && $targs->{ssrc} != $ssrc ) {
282             # RTP stream has changed, reset rseq
283 0         0 delete $targs->{rseq};
284             }
285 5208 50 66     31274 if ( defined $targs->{rseq} && $seq<= $targs->{rseq}
      33        
286             && $targs->{rseq} - $seq < 60000 ) {
287 0         0 DEBUG( 10,"seq=$seq last=$targs->{rseq} - dropped" );
288 0         0 return;
289             }
290 5208         11284 $targs->{rseq} = $seq;
291              
292 5208 100       18098 if ( ref($writeto)) {
    50          
293             # callback
294 1801         8893 invoke_callback($writeto,$payload,$seq,$tstamp,$channel,$mpt & 0x7f);
295             } elsif ( $writeto ) {
296             # save into file
297 0         0 my $fd = $targs->{fdr};
298 0 0       0 if ( !$fd ) {
299 0 0       0 open( $fd,'>',$writeto ) || die $!;
300 0         0 $targs->{fdr} = $fd
301             }
302 0         0 syswrite($fd,$payload);
303             }
304              
305 5208 100       36498 if ( my $xt = $targs->{dtmf_xtract} ) {
306 4208         10409 my ($sub,$cb) = @$xt;
307 4208 100       16466 if ( my ($event,$duration) = $sub->($packet)) {
308 72         562 DEBUG(40,"received dtmf <$event,$duration>");
309 72         462 $cb->($event,$duration);
310             }
311             }
312              
313 5208 100       29850 return wantarray ? ( $packet,$mpt,$seq,$tstamp,$ssrc,$csrc ): $packet;
314             }
315              
316             ###########################################################################
317             # Helper to read RTP data from file (PCMU 8000) and send them through
318             # the RTP socket
319             # Args: ($sock,$loop,$addr,$readfrom,$targs,$timer)
320             # $sock: RTP socket
321             # $loop: event loop (used for looptime for timestamp)
322             # $addr: where to send data
323             # $readfrom: filename for reading or callback which will return payload
324             # $channel: index of RTP channel
325             # $targs: \%hash to hold state info between calls of this function
326             # especially 'repeat' holds the number of times this data has to be
327             # send (<=0 means forever) and 'cb_done' holds a [\&sub,@arg] callback
328             # to end the call after sending all data
329             # 'repeat' makes only sense if $readfrom is filename
330             # $timer: timer which gets canceled once all data are send
331             # Return: NONE
332             ###########################################################################
333             sub _send_rtp {
334 3520     3520   12941 my ($sock,$loop,$addr,$readfrom,$channel,$targs,$timer) = @_;
335              
336 3520         9450 $targs->{wseq}++;
337 3520         7489 my $seq = $targs->{wseq};
338              
339             # 32 bit timestamp based on seq and packet size
340 3520         13203 my $timestamp = ( $targs->{rtp_param}[1] * $seq ) % 2**32;
341              
342 3520         11750 my @pkt = _generate_dtmf($targs,$seq,$timestamp,0x1234);
343 3520 100 100     16397 if (@pkt && $pkt[0] ne '') {
344 1307         5178 DEBUG( 100,"send DTMF to RTP");
345 1307         104022 send( $sock,$_,0,$addr ) for(@pkt);
346 1307         11552 return;
347             }
348              
349 2213         7107 my $buf;
350             my $rtp_event;
351 2213         0 my $payload_type;
352              
353 2213 50       7707 if ( ref($readfrom) ) {
354             # payload by callback
355 2213         7406 $buf = invoke_callback($readfrom,$seq,$channel);
356 2213 100       43641 if ( !$buf ) {
357 12         66 DEBUG( 50, "no more data from callback" );
358 12 50       101 $timer && $timer->cancel;
359 12         58 invoke_callback( $targs->{cb_done} );
360 12         215 return;
361             }
362 2201 50       6877 ($buf,$payload_type,$rtp_event,$timestamp) = @$buf if ref($buf);
363             } else {
364             # read from file
365 0         0 for(my $tries = 0; $tries<2;$tries++ ) {
366 0   0     0 $targs->{wseq} ||= int( rand( 2**16 ));
367 0         0 my $fd = $targs->{fd};
368 0 0       0 if ( !$fd ) {
369 0 0       0 $targs->{repeat} = -1 if $targs->{repeat} < 0;
370 0 0       0 if ( $targs->{repeat} == 0 ) {
371             # no more sending
372 0         0 DEBUG( 50, "no more data from file" );
373 0 0       0 $timer && $timer->cancel;
374 0         0 invoke_callback( $targs->{cb_done} );
375 0         0 return;
376             }
377              
378 0 0       0 open( $fd,'<',$readfrom ) || die $!;
379 0         0 $targs->{fd} = $fd;
380             }
381 0         0 my $size = $targs->{rtp_param}[1]; # 160 for PCMU/8000
382 0 0       0 last if read( $fd,$buf,$size ) == $size;
383             # try to reopen file
384 0         0 close($fd);
385 0         0 $targs->{fd} = undef;
386 0         0 $targs->{repeat}--;
387             }
388             }
389              
390 2201 50 33     12143 die $! if ! defined $buf or $buf eq '';
391 2201         3671 if (0) {
392             DEBUG(50, "%s -> %s seq=%d ts=%x",
393             ip_sockaddr2string(getsockname($sock)),
394             ip_sockaddr2string($addr),
395             $seq, $timestamp
396             );
397             }
398              
399             # add RTP header
400 2201 50       6467 $rtp_event = 0 if ! defined $rtp_event;
401 2201 50 50     10279 $payload_type = $targs->{rtp_param}[0]||0 # 0 == PMCU 8000
402             if ! defined $payload_type;
403              
404 2201         14946 my $header = pack('CCnNN',
405             0b10000000, # Version 2
406             $payload_type | ( $rtp_event << 7 ) ,
407             $seq, # sequence
408             $timestamp,
409             0x1234, # source ID
410             );
411 2201         9097 DEBUG( 100,"send %d bytes to RTP", length($buf));
412 2201         187618 send( $sock,$header.$buf,0,$addr );
413             }
414              
415             ###########################################################################
416             # Helper to send DTMF
417             # Args: ($targs,$seq,$timestamp,$srcid)
418             # $targs: hash which is shared with _send_rtp and other callbacks, contains
419             # dtmf array with events
420             # $seq,$timestamp,$srcid: parameter for RTP packet
421             # Returns: @pkt
422             # (): no DTMF events to handle
423             # $pkt[0] eq '': DTMF in process, but no data
424             # @pkt: RTP packets to send
425             ###########################################################################
426             sub _generate_dtmf {
427 5271     5271   13968 my ($targs,$seq,$timestamp,$srcid) = @_;
428 5271         11670 my $dtmfs = $targs->{dtmf_gen};
429 5271 100 66     30119 $dtmfs and @$dtmfs or return;
430              
431 1415         4326 while ( @$dtmfs ) {
432 1499         2741 my $dtmf = $dtmfs->[0];
433 1499 100       4756 if ( my $duration = $dtmf->{duration} ) {
434             DEBUG(40,"generate dtmf ".(
435             $dtmf->{sub} ? '' :
436 1487 100       8694 defined $dtmf->{event} ? "<$dtmf->{event},$duration>" :
    100          
437             ""
438             ));
439             my $cb = $dtmf->{sub}
440 1487   66     6177 ||= dtmf_generator($dtmf->{event},$duration,%$dtmf);
441 1487         6480 my @pkt = $cb->($seq,$timestamp,$srcid);
442 1487 100       8391 return @pkt if @pkt;
443             }
444 96         242 shift(@$dtmfs);
445 96 100       1859 if ( my $cb = $dtmf->{cb_final} ) {
446 12         66 invoke_callback($cb,'OK');
447             }
448             }
449 12         47 return;
450             }
451              
452             1;