File Coverage

blib/lib/Metabrik/Forensic/Pcap.pm
Criterion Covered Total %
statement 12 129 9.3
branch 0 58 0.0
condition 0 21 0.0
subroutine 4 11 36.3
pod 5 7 71.4
total 21 226 9.2


line stmt bran cond sub pod time code
1             #
2             # $Id$
3             #
4             # forensic::pcap Brik
5             #
6             package Metabrik::Forensic::Pcap;
7 1     1   857 use strict;
  1         2  
  1         30  
8 1     1   5 use warnings;
  1         2  
  1         27  
9              
10 1     1   5 use base qw(Metabrik::Client::Elasticsearch::Query);
  1         2  
  1         1281  
11              
12             sub brik_properties {
13             return {
14 0     0 1   revision => '$Revision$',
15             tags => [ qw(unstable) ],
16             author => 'GomoR ',
17             license => 'http://opensource.org/licenses/BSD-3-Clause',
18             attributes => {
19             nodes => [ qw(node_list) ], # Inherited
20             index => [ qw(index) ], # Inherited
21             type => [ qw(type) ], # Inherited
22             },
23             attributes_default => {
24             index => 'forensicpcap-*',
25             type => 'pcap',
26             },
27             commands => {
28             create_client => [ ], # Inherited
29             reset_client => [ ], # Inherited
30             query => [ qw(query index|OPTIONAL type|OPTIONAL) ], # Inherited
31             from_json_file => [ qw(json_file index|OPTIONAL type|OPTIONAL) ], # Inherited
32             from_dump_file => [ qw(dump_file index|OPTIONAL type|OPTIONAL) ], # Inherited
33             pcap_to_elasticsearch => [ qw(file filter|OPTIONAL) ],
34             show_sessions => [ qw(ip_address port protocol) ],
35             },
36             require_modules => {
37             'Metabrik::File::Pcap' => [ ],
38             'Metabrik::Time::Universal' => [ ],
39             },
40             require_binaries => {
41             },
42             optional_binaries => {
43             },
44             need_packages => {
45             },
46             };
47             }
48              
49             sub brik_use_properties {
50 0     0 1   my $self = shift;
51              
52             return {
53 0           attributes_default => {
54             },
55             };
56             }
57              
58             sub brik_preinit {
59 0     0 1   my $self = shift;
60              
61             # Do your preinit here, return 0 on error.
62              
63 0           return $self->SUPER::brik_preinit;
64             }
65              
66             sub brik_init {
67 0     0 1   my $self = shift;
68              
69             # Do your init here, return 0 on error.
70              
71 0           return $self->SUPER::brik_init;
72             }
73              
74             sub pcap_to_elasticsearch {
75 0     0 0   my $self = shift;
76 0           my ($file, $filter) = @_;
77              
78 0   0       $filter ||= '';
79 0 0         $self->brik_help_run_undef_arg('pcap_to_elasticsearch', $file) or return;
80 0 0         $self->brik_help_run_file_not_found('pcap_to_elasticsearch', $file) or return;
81              
82 0 0         my $fp = Metabrik::File::Pcap->new_from_brik_init($self) or return;
83 0 0         $fp->open($file, 'read', $filter) or return;
84              
85 0 0         my $tu = Metabrik::Time::Universal->new_from_brik_init($self) or return;
86 0           my $today = $tu->today;
87              
88 0           my $index = "forensicpcap-$today";
89 0           my $type = "pcap";
90              
91 0 0         $self->create_client or return;
92              
93 0           my $count_before = 0;
94 0 0         if ($self->is_index_exists($index)) {
95 0           $count_before = $self->count($index, $type);
96 0 0         if (! defined($count_before)) {
97 0           return;
98             }
99 0           $self->log->info("pcap_to_elasticsearch: current index [$index] count is ".
100             "[$count_before]");
101             }
102              
103 0 0         $self->open_bulk_mode($index, $type) or return;
104              
105 0           $self->log->info("pcap_to_elasticsearch: importing file [$file] to index ".
106             "[$index] with type [$type]");
107              
108 0           my $print_re = qr/^[[:print:]]{5,}/;
109              
110 0           my $read = 0;
111 0           my $imported = 0;
112 0           while (1) {
113 0           my $h = $fp->read_next(10); # We read 10 by 10
114 0 0         if (! defined($h)) {
115 0           $self->log->error("pcap_to_elasticsearch: unable to read frame, skipping");
116 0           next;
117             }
118              
119 0 0         last if @$h == 0; # Eof
120              
121 0           $read += @$h;
122              
123 0           for my $this (@$h) {
124 0           my $simple = $fp->from_read($this);
125 0 0         if (! defined($simple)) {
126 0           $self->log->error("pcap_to_elasticsearch: unable to parse frame, skipping");
127 0           next;
128             }
129              
130 0           my $timestamp = $simple->timestamp;
131              
132 0           my $new = {
133             '@version' => 1,
134             '@timestamp' => $tu->timestamp_to_tz_time($timestamp),
135             };
136 0           my $skip_payload = 0;
137 0           for my $layer (reverse $simple->layers) {
138 0           my $this_layer = $layer->layer;
139              
140 0           my $class = ref($layer);
141 0           my %h = map { $_ => $layer->[$layer->cgGetIndice($_)] }
142 0           @{$class->cgGetAttributes};
  0            
143              
144             # The first we loop, we are using the last layer where we
145             # want to keep the payload. For all others, we remove it.
146 0 0         if ($skip_payload) {
147 0           delete $h{payload};
148             }
149             else {
150 0           $skip_payload = 1;
151             }
152             # We convert IPv4 and TCP options to hex
153 0 0         if ($layer->layer eq 'TCP') {
154 0           $h{options} = CORE::unpack('H*', $h{options});
155             # If payload does not seem printable, we encode in hex
156 0 0 0       if (defined($h{payload}) && $h{payload} !~ $print_re) {
157 0           $h{payload} = CORE::unpack('H*', $h{payload});
158             }
159             #print "payload[".$h{payload}."]\n" if defined($h{payload});
160             }
161 0 0         if ($layer->layer eq 'UDP') {
    0          
162             # If payload does not seem printable, we encode in hex
163 0 0 0       if (defined($h{payload}) && $h{payload} !~ $print_re) {
164 0           $h{payload} = CORE::unpack('H*', $h{payload});
165             }
166             #print "payload[".$h{payload}."]\n" if defined($h{payload});
167             }
168             elsif ($layer->layer eq 'IPv4') {
169 0           $h{options} = CORE::unpack('H*', $h{options});
170             }
171 0           delete $h{raw};
172 0           delete $h{nextLayer};
173 0           delete $h{noFixLen};
174              
175 0           $new->{$this_layer} = \%h;
176             }
177              
178 0           my $r = $self->index_bulk($new);
179 0 0         if (! defined($r)) {
180 0           $self->log->error("pcap_to_elasticsearch: bulk index failed for index ".
181             "[$index] at read [$read], skipping chunk");
182 0           next;
183             }
184              
185 0           $imported++;
186             }
187             }
188              
189 0           $self->bulk_flush;
190              
191 0           $self->refresh_index($index);
192              
193 0 0         my $count_current = $self->count($index, $type) or return;
194 0           $self->log->info("pcap_to_elasticsearch: after index [$index] count is ".
195             "[$count_current]");
196              
197 0           my $skipped = 0;
198 0 0         my $complete = (($count_current - $count_before) == $read) ? 1 : 0;
199 0 0         if ($complete) { # If complete, import has been retried, and everything is now ok.
200 0           $imported = $read;
201             }
202             else {
203 0           $skipped = $read - ($count_current - $count_before);
204             }
205              
206 0 0         if (! $complete) {
207 0           $self->log->warning("pcap_to_elasticsearch: import incomplete");
208             }
209             else {
210 0           $self->log->info("pcap_to_elasticsearch: successfully imported [$read] frames");
211             }
212              
213 0           return 1;
214             }
215              
216             sub show_sessions {
217 0     0 0   my $self = shift;
218 0           my ($ip_address, $port, $protocol) = @_;
219              
220 0   0       $protocol ||= 'tcp';
221 0 0         $self->brik_help_run_undef_arg('show_sessions', $ip_address) or return;
222 0 0         $self->brik_help_run_undef_arg('show_sessions', $port) or return;
223              
224 0           $protocol = lc($protocol);
225 0 0 0       if ($protocol ne 'tcp' && $protocol ne 'udp') {
226 0           return $self->log->error("show_sessions: protocol must be tcp or udp");
227             }
228              
229             #
230             # TCP query
231             #
232             # (TCP.flags:16 OR TCP.flags:24)
233             # AND (IPv4.src:$ip OR IPv4.dst:$ip)
234             # AND (TCP.dst:$port OR TCP.src:$port)
235             #
236 0           my @should1 = ();
237 0           my @should2 = (
238             { term => { 'IPv4.src' => $ip_address } },
239             { term => { 'IPv4.dst' => $ip_address } },
240             );
241 0           my @should3 = ();
242              
243 0 0         if ($protocol eq 'tcp') {
244 0           push @should1, { term => { 'TCP.flags' => 16 } };
245 0           push @should1, { term => { 'TCP.flags' => 24 } };
246 0           push @should3, { term => { 'TCP.dst' => $port } };
247 0           push @should3, { term => { 'TCP.src' => $port } };
248             }
249             else {
250 0           push @should3, { term => { 'UDP.dst' => $port } };
251 0           push @should3, { term => { 'UDP.src' => $port } };
252             }
253              
254             # IPv4.dst:37.247.10.18 OR IPv4.src:37.247.10.18
255              
256 0           my $q = {
257             size => 1000,
258             sort => [
259             { '_uid' => { order => "asc" } },
260             { '@timestamp' => { order => "asc" } },
261             ],
262             query => {
263             bool => {
264             must => [
265             { bool => { should => \@should1, }, },
266             { bool => { should => \@should2, }, },
267             { bool => { should => \@should3, }, },
268             ],
269             }
270             },
271             };
272              
273 0 0         my $r = $self->query($q) or return;
274 0 0         my $hits = $self->get_query_result_hits($r) or return;
275              
276 1     1   12 use Data::Dumper;
  1         2  
  1         260  
277              
278 0           for my $this (@$hits) {
279             #print Dumper($this)."\n"; last;
280 0           $this = $this->{_source};
281 0           my $timestamp = $this->{'@timestamp'};
282 0           my $ip_src = $this->{IPv4}{src};
283 0           my $ip_dst = $this->{IPv4}{dst};
284 0   0       my $src = $this->{TCP}{src} || $this->{UDP}{src};
285 0   0       my $dst = $this->{TCP}{dst} || $this->{UDP}{dst};
286 0   0       my $payload = $this->{TCP}{payload} || $this->{UDP}{payload} || '';
287              
288 0           print "$timestamp: [$ip_src]:$src > [$ip_dst]:$dst [$payload]\n";
289             }
290              
291 0           return 1;
292             }
293              
294             sub brik_fini {
295 0     0 1   my $self = shift;
296              
297             # Do your fini here, return 0 on error.
298              
299 0           return $self->SUPER::brik_fini;
300             }
301              
302             1;
303              
304             __END__