| lib/Mail/DMARC/Report/Store/SQL.pm | |||
|---|---|---|---|
| Criterion | Covered | Total | % |
| statement | 357 | 390 | 91.5 |
| branch | 107 | 182 | 58.7 |
| condition | 22 | 53 | 41.5 |
| subroutine | 46 | 47 | 97.8 |
| pod | 0 | 35 | 0.0 |
| total | 532 | 707 | 75.2 |
| line | stmt | bran | cond | sub | pod | time | code |
|---|---|---|---|---|---|---|---|
| 1 | package Mail::DMARC::Report::Store::SQL; | ||||||
| 2 | our $VERSION = '1.20210927'; | ||||||
| 3 | 4 | 4 | 863 | use strict; | |||
| 4 | 8 | ||||||
| 4 | 132 | ||||||
| 4 | 4 | 4 | 25 | use warnings; | |||
| 4 | 8 | ||||||
| 4 | 124 | ||||||
| 5 | |||||||
| 6 | 4 | 4 | 21 | use Carp; | |||
| 4 | 9 | ||||||
| 4 | 300 | ||||||
| 7 | 4 | 4 | 25 | use Data::Dumper; | |||
| 4 | 8 | ||||||
| 4 | 323 | ||||||
| 8 | 4 | 4 | 2282 | use DBIx::Simple; | |||
| 4 | 83166 | ||||||
| 4 | 152 | ||||||
| 9 | 4 | 4 | 37 | use File::ShareDir; | |||
| 4 | 9 | ||||||
| 4 | 194 | ||||||
| 10 | |||||||
| 11 | 4 | 4 | 2231 | use Mail::DMARC::Report::Store::SQL::Grammars::MySQL; | |||
| 4 | 12 | ||||||
| 4 | 162 | ||||||
| 12 | 4 | 4 | 2005 | use Mail::DMARC::Report::Store::SQL::Grammars::SQLite; | |||
| 4 | 10 | ||||||
| 4 | 171 | ||||||
| 13 | 4 | 4 | 1927 | use Mail::DMARC::Report::Store::SQL::Grammars::PostgreSQL; | |||
| 4 | 20 | ||||||
| 4 | 163 | ||||||
| 14 | |||||||
| 15 | 4 | 4 | 47 | use parent 'Mail::DMARC::Base'; | |||
| 4 | 9 | ||||||
| 4 | 31 | ||||||
| 16 | 4 | 4 | 376 | use Mail::DMARC::Report::Aggregate; | |||
| 4 | 8 | ||||||
| 4 | 18044 | ||||||
| 17 | |||||||
| 18 | sub save_aggregate { | ||||||
| 19 | 5 | 5 | 0 | 13 | my ( $self, $agg ) = @_; | ||
| 20 | |||||||
| 21 | 5 | 28 | $self->db_connect(); | ||||
| 22 | |||||||
| 23 | 5 | 50 | 50 | croak "policy_published must be a Mail::DMARC::Policy object" | |||
| 24 | if 'Mail::DMARC::Policy' ne ref $agg->policy_published; | ||||||
| 25 | |||||||
| 26 | #warn Dumper($meta); ## no critic (Carp) | ||||||
| 27 | 5 | 24 | foreach my $f ( qw/ org_name email begin end / ) { | ||||
| 28 | 20 | 50 | 73 | croak "meta field $f required" if ! $agg->metadata->$f; | |||
| 29 | } | ||||||
| 30 | |||||||
| 31 | 5 | 50 | 34 | my $rid = $self->get_report_id( $agg ) | |||
| 32 | or croak "failed to create report!"; | ||||||
| 33 | |||||||
| 34 | # on 6/8/2013, Microsoft spat out a bunch of reports with zero records. | ||||||
| 35 | 5 | 50 | 46 | if ( ! $agg->record ) { | |||
| 36 | 0 | 0 | warn "\ta report with ZERO records! Illegal.\n"; ## no critic (Carp) | ||||
| 37 | 0 | 0 | return $rid; | ||||
| 38 | }; | ||||||
| 39 | |||||||
| 40 | 5 | 19 | foreach my $rec ( @{ $agg->record } ) { | ||||
| 5 | 22 | ||||||
| 41 | 5 | 33 | $self->insert_agg_record($rid, $rec); | ||||
| 42 | }; | ||||||
| 43 | |||||||
| 44 | 5 | 90 | return $rid; | ||||
| 45 | } | ||||||
| 46 | |||||||
| 47 | sub retrieve { | ||||||
| 48 | 6 | 6 | 0 | 3579 | my ( $self, %args ) = @_; | ||
| 49 | |||||||
| 50 | 6 | 18 | my $query = $self->grammar->select_report_query; | ||||
| 51 | 6 | 11 | my @params; | ||||
| 52 | |||||||
| 53 | 6 | 100 | 18 | if ( $args{rid} ) { | |||
| 54 | 1 | 4 | $query .= $self->grammar->and_arg('r.id'); | ||||
| 55 | 1 | 3 | push @params, $args{rid}; | ||||
| 56 | }; | ||||||
| 57 | 6 | 100 | 17 | if ( $args{begin} ) { | |||
| 58 | 1 | 5 | $query .= $self->grammar->and_arg('r.begin', '>='); | ||||
| 59 | 1 | 3 | push @params, $args{begin}; | ||||
| 60 | }; | ||||||
| 61 | 6 | 100 | 15 | if ( $args{end} ) { | |||
| 62 | 1 | 5 | $query .= $self->grammar->and_arg('r.end', '<='); | ||||
| 63 | 1 | 5 | push @params, $args{end}; | ||||
| 64 | }; | ||||||
| 65 | 6 | 100 | 15 | if ( $args{author} ) { | |||
| 66 | 1 | 6 | $query .= $self->grammar->and_arg('a.org_name'); | ||||
| 67 | 1 | 4 | push @params, $args{author}; | ||||
| 68 | }; | ||||||
| 69 | 6 | 100 | 15 | if ( $args{from_domain} ) { | |||
| 70 | 1 | 4 | $query .= $self->grammar->and_arg('fd.domain'); | ||||
| 71 | 1 | 4 | push @params, $args{from_domain}; | ||||
| 72 | }; | ||||||
| 73 | |||||||
| 74 | 6 | 18 | my $reports = $self->query( $query, \@params ); | ||||
| 75 | |||||||
| 76 | 6 | 15 | foreach (@$reports ) { | ||||
| 77 | 6 | 34 | $_->{begin} = join(" ", split(/T/, $self->epoch_to_iso( $_->{begin} ))); | ||||
| 78 | 6 | 22 | $_->{end} = join(" ", split(/T/, $self->epoch_to_iso( $_->{end} ))); | ||||
| 79 | }; | ||||||
| 80 | 6 | 22 | return $reports; | ||||
| 81 | } | ||||||
| 82 | |||||||
| 83 | sub next_todo { | ||||||
| 84 | 8 | 8 | 0 | 24 | my ( $self ) = @_; | ||
| 85 | |||||||
| 86 | 8 | 100 | 30 | if ( ! exists $self->{ _todo_list } ) { | |||
| 87 | 4 | 15 | $self->{_todo_list} = $self->query( $self->grammar->select_todo_query, [ $self->time ] ); | ||||
| 88 | 4 | 50 | 23 | return if ! $self->{_todo_list}; | |||
| 89 | } | ||||||
| 90 | |||||||
| 91 | 8 | 16 | my $next_todo = shift @{ $self->{_todo_list} }; | ||||
| 8 | 23 | ||||||
| 92 | 8 | 100 | 34 | if ( ! $next_todo ) { | |||
| 93 | 4 | 13 | delete $self->{_todo_list}; | ||||
| 94 | 4 | 19 | return; | ||||
| 95 | } | ||||||
| 96 | |||||||
| 97 | 4 | 43 | my $agg = Mail::DMARC::Report::Aggregate->new(); | ||||
| 98 | 4 | 24 | $self->populate_agg_metadata( \$agg, \$next_todo ); | ||||
| 99 | |||||||
| 100 | 4 | 34 | my $pp = $self->get_report_policy_published( $next_todo->{rid} ); | ||||
| 101 | 4 | 11 | $pp->{domain} = $next_todo->{from_domain}; | ||||
| 102 | 4 | 24 | $agg->policy_published( Mail::DMARC::Policy->new( %$pp ) ); | ||||
| 103 | |||||||
| 104 | 4 | 24 | $self->populate_agg_records( \$agg, $next_todo->{rid} ); | ||||
| 105 | 4 | 61 | return $agg; | ||||
| 106 | } | ||||||
| 107 | |||||||
| 108 | sub retrieve_todo { | ||||||
| 109 | 1 | 1 | 0 | 569 | my ( $self, @args ) = @_; | ||
| 110 | |||||||
| 111 | # this method extracts the data from the SQL tables and populates a | ||||||
| 112 | # list of Aggregate report objects with them. | ||||||
| 113 | 1 | 5 | my $reports = $self->query( $self->grammar->select_todo_query, [ $self->time ] ); | ||||
| 114 | 1 | 3 | my @reports_todo; | ||||
| 115 | 1 | 50 | 6 | return \@reports_todo if ! scalar @$reports; | |||
| 116 | |||||||
| 117 | 1 | 2 | foreach my $report ( @{ $reports } ) { | ||||
| 1 | 3 | ||||||
| 118 | |||||||
| 119 | 1 | 15 | my $agg = Mail::DMARC::Report::Aggregate->new(); | ||||
| 120 | 1 | 7 | $self->populate_agg_metadata( \$agg, \$report ); | ||||
| 121 | |||||||
| 122 | 1 | 5 | my $pp = $self->get_report_policy_published( $report->{rid} ); | ||||
| 123 | 1 | 5 | $pp->{domain} = $report->{from_domain}; | ||||
| 124 | 1 | 6 | $agg->policy_published( Mail::DMARC::Policy->new( %$pp ) ); | ||||
| 125 | |||||||
| 126 | 1 | 6 | $self->populate_agg_records( \$agg, $report->{rid} ); | ||||
| 127 | 1 | 9 | push @reports_todo, $agg; | ||||
| 128 | } | ||||||
| 129 | 1 | 7 | return \@reports_todo; | ||||
| 130 | } | ||||||
| 131 | |||||||
| 132 | sub delete_report { | ||||||
| 133 | 8 | 8 | 0 | 999 | my $self = shift; | ||
| 134 | 8 | 50 | 36 | my $report_id = shift or croak "missing report ID"; | |||
| 135 | 8 | 50 | 69 | print "deleting report $report_id\n" if $self->verbose; | |||
| 136 | |||||||
| 137 | # deletes with FK don't cascade in SQLite? Clean each table manually | ||||||
| 138 | 8 | 36 | my $rows = $self->query( $self->grammar->report_record_id, [$report_id] ); | ||||
| 139 | 8 | 30 | my @row_ids = map { $_->{id} } @$rows; | ||||
| 4 | 18 | ||||||
| 140 | |||||||
| 141 | 8 | 100 | 31 | if (scalar @row_ids) { | |||
| 142 | 4 | 10 | foreach my $table (qw/ report_record_spf report_record_dkim report_record_reason /) { | ||||
| 143 | 12 | 50 | 5434 | print "deleting $table rows " . join(',', @row_ids) . "\n" if $self->verbose; | |||
| 144 | 12 | 23 | eval { $self->query( $self->grammar->delete_from_where_record_in($table, \@row_ids)); }; | ||||
| 12 | 33 | ||||||
| 145 | # warn $@ if $@; | ||||||
| 146 | } | ||||||
| 147 | } | ||||||
| 148 | 8 | 2561 | foreach my $table (qw/ report_policy_published report_record report_error /) { | ||||
| 149 | 24 | 50 | 7321 | print "deleting $table rows for report $report_id\n" if $self->verbose; | |||
| 150 | 24 | 42 | eval { $self->query( $self->grammar->delete_from_where_report( $table, [$report_id] )); }; | ||||
| 24 | 84 | ||||||
| 151 | # warn $@ if $@; | ||||||
| 152 | } | ||||||
| 153 | |||||||
| 154 | # In MySQL, where FK constraints DO cascade, this is the only query needed | ||||||
| 155 | 8 | 3354 | $self->query( $self->grammar->delete_report, [$report_id] ); | ||||
| 156 | 8 | 78 | return 1; | ||||
| 157 | } | ||||||
| 158 | |||||||
| 159 | sub get_domain_id { | ||||||
| 160 | 48 | 48 | 0 | 197 | my ( $self, $domain ) = @_; | ||
| 161 | 48 | 50 | 154 | croak "missing domain calling " . ( caller(0) )[3] if !$domain; | |||
| 162 | 48 | 201 | my $r = $self->query( $self->grammar->select_domain_id, [$domain] ); | ||||
| 163 | 48 | 100 | 50 | 357 | if ( $r && scalar @$r ) { | ||
| 164 | 27 | 162 | return $r->[0]{id}; | ||||
| 165 | } | ||||||
| 166 | 21 | 79 | return $self->query( $self->grammar->insert_domain, [$domain]); | ||||
| 167 | } | ||||||
| 168 | |||||||
| 169 | sub get_author_id { | ||||||
| 170 | 12 | 12 | 0 | 68 | my ( $self, $meta ) = @_; | ||
| 171 | 12 | 50 | 67 | croak "missing author name" if !$meta->org_name; | |||
| 172 | 12 | 56 | my $r = $self->query( | ||||
| 173 | $self->grammar->select_author_id, | ||||||
| 174 | [ $meta->org_name ] | ||||||
| 175 | ); | ||||||
| 176 | 12 | 100 | 50 | 107 | if ( $r && scalar @$r ) { | ||
| 177 | 3 | 17 | return $r->[0]{id}; | ||||
| 178 | } | ||||||
| 179 | 9 | 50 | 42 | carp "missing email" if !$meta->email; | |||
| 180 | 9 | 38 | return $self->query( | ||||
| 181 | $self->grammar->insert_author, | ||||||
| 182 | [ $meta->org_name, $meta->email, $meta->extra_contact_info ] | ||||||
| 183 | ); | ||||||
| 184 | } | ||||||
| 185 | |||||||
| 186 | sub get_report_id { | ||||||
| 187 | 9 | 9 | 0 | 34 | my ( $self, $aggr ) = @_; | ||
| 188 | |||||||
| 189 | 9 | 37 | my $meta = $aggr->metadata; | ||||
| 190 | 9 | 49 | my $pol = $aggr->policy_published; | ||||
| 191 | |||||||
| 192 | # check if report exists | ||||||
| 193 | 9 | 50 | 45 | my $author_id = $self->get_author_id( $meta ) or croak; | |||
| 194 | 9 | 50 | 608 | my $from_dom_id = $self->get_domain_id( $pol->domain ) or croak; | |||
| 195 | |||||||
| 196 | 9 | 206 | my $ids; | ||||
| 197 | 9 | 50 | 92 | if ( $meta->report_id ) { | |||
| 198 | # reports arriving via the wire will have an author ID & report ID | ||||||
| 199 | 0 | 0 | $ids = $self->query( | ||||
| 200 | $self->grammar->select_report_id, | ||||||
| 201 | [ $meta->report_id, $author_id ] | ||||||
| 202 | ); | ||||||
| 203 | } | ||||||
| 204 | else { | ||||||
| 205 | # Reports submitted by our local MTA will not have a report ID | ||||||
| 206 | # They aggregate on the From domain, where the DMARC policy was discovered | ||||||
| 207 | 9 | 49 | $ids = $self->query( | ||||
| 208 | $self->grammar->select_id_with_end, | ||||||
| 209 | [ $from_dom_id, $self->time, $author_id ] | ||||||
| 210 | ); | ||||||
| 211 | }; | ||||||
| 212 | |||||||
| 213 | 9 | 50 | 119 | if ( scalar @$ids ) { # report already exists | |||
| 214 | 0 | 0 | return $self->{report_id} = $ids->[0]{id}; | ||||
| 215 | } | ||||||
| 216 | |||||||
| 217 | 9 | 50 | 41 | my $rid = $self->{report_id} = $self->query( | |||
| 218 | $self->grammar->insert_report, | ||||||
| 219 | [ $from_dom_id, $meta->begin, $meta->end, $author_id, $meta->uuid ] | ||||||
| 220 | ) or return; | ||||||
| 221 | |||||||
| 222 | 9 | 293 | $self->insert_policy_published( $rid, $pol ); | ||||
| 223 | 9 | 66 | return $rid; | ||||
| 224 | } | ||||||
| 225 | |||||||
| 226 | sub get_report { | ||||||
| 227 | 4 | 4 | 0 | 879 | my ($self,@args) = @_; | ||
| 228 | 4 | 50 | 24 | croak "invalid parameters" if @args % 2; | |||
| 229 | 4 | 13 | my %args = @args; | ||||
| 230 | |||||||
| 231 | 4 | 15 | my $query = $self->grammar->select_report_query; | ||||
| 232 | 4 | 9 | my @params; | ||||
| 233 | 4 | 13 | my @known = qw/ r.id a.org_name fd.domain r.begin r.end /; | ||||
| 234 | 4 | 12 | my %known = map { $_ => 1 } @known; | ||||
| 20 | 62 | ||||||
| 235 | |||||||
| 236 | # TODO: allow custom search ops? 'searchOper' => 'eq', | ||||||
| 237 | 4 | 50 | 33 | 25 | if ( $args{searchField} && $known{ $args{searchField} } ) { | ||
| 238 | 0 | 0 | $query .= $self->grammar->and_arg($args{searchField}); | ||||
| 239 | 0 | 0 | push @params, $args{searchString}; | ||||
| 240 | }; | ||||||
| 241 | |||||||
| 242 | 4 | 11 | foreach my $known ( @known ) { | ||||
| 243 | 20 | 50 | 46 | next if ! defined $args{$known}; | |||
| 244 | 0 | 0 | $query .= $self->grammar->and_arg($known); | ||||
| 245 | 0 | 0 | push @params, $args{$known}; | ||||
| 246 | }; | ||||||
| 247 | 4 | 50 | 33 | 21 | if ( $args{sidx} && $known{$args{sidx}} ) { | ||
| 248 | 0 | 0 | 0 | if ( $args{sord} ) { | |||
| 249 | 0 | 0 | 0 | $query .= $self->grammar->order_by($args{sidx}, $args{sord} eq 'desc' ? ' DESC' : ' ASC'); | |||
| 250 | }; | ||||||
| 251 | }; | ||||||
| 252 | 4 | 12 | my $total_recs = $self->dbix->query($self->grammar->count_reports)->list; | ||||
| 253 | 4 | 837 | my $total_pages = 0; | ||||
| 254 | 4 | 100 | 470 | if ( $args{rows} ) { | |||
| 255 | 1 | 50 | 5 | if ( $args{page} ) { | |||
| 256 | 0 | 0 | $total_pages = POSIX::ceil($total_recs / $args{rows}); | ||||
| 257 | 0 | 0 | my $start = ($args{rows} * $args{page}) - $args{rows}; | ||||
| 258 | 0 | 0 | 0 | $start = 0 if $start < 0; | |||
| 259 | 0 | 0 | $query .= $self->grammar->limit_args(2); | ||||
| 260 | 0 | 0 | push @params, $start, $args{rows}; | ||||
| 261 | } | ||||||
| 262 | else { | ||||||
| 263 | 1 | 5 | $query .= $self->grammar->limit_args; | ||||
| 264 | 1 | 5 | push @params, $args{rows}; | ||||
| 265 | }; | ||||||
| 266 | }; | ||||||
| 267 | |||||||
| 268 | # warn "query: $query\n" . join(", ", @params) . "\n"; | ||||||
| 269 | 4 | 22 | my $reports = $self->query($query, \@params); | ||||
| 270 | 4 | 13 | foreach (@$reports ) { | ||||
| 271 | 12 | 40 | $_->{begin} = join(' ', split(/T/, $self->epoch_to_iso( $_->{begin} ))); |
||||
| 272 | 12 | 37 | $_->{end} = join(' ', split(/T/, $self->epoch_to_iso( $_->{end} ))); |
||||
| 273 | }; | ||||||
| 274 | # return in the format expected by jqGrid | ||||||
| 275 | return { | ||||||
| 276 | cur_page => $args{page}, | ||||||
| 277 | 4 | 51 | total_pages => $total_pages, | ||||
| 278 | total_rows => $total_recs, | ||||||
| 279 | rows => $reports, | ||||||
| 280 | }; | ||||||
| 281 | } | ||||||
| 282 | |||||||
| 283 | sub get_report_policy_published { | ||||||
| 284 | 6 | 6 | 0 | 613 | my ($self, $rid) = @_; | ||
| 285 | 6 | 22 | my $pp = $self->query($self->grammar->select_report_policy_published, [ $rid ] )->[0]; | ||||
| 286 | 6 | 50 | 31 | $pp->{p} ||= 'none'; | |||
| 287 | 6 | 71 | $pp = Mail::DMARC::Policy->new( v=>'DMARC1', %$pp ); | ||||
| 288 | 6 | 23 | return $pp; | ||||
| 289 | } | ||||||
| 290 | |||||||
| 291 | sub get_rr { | ||||||
| 292 | 0 | 0 | 0 | 0 | my ($self,@args) = @_; | ||
| 293 | 0 | 0 | 0 | croak "invalid parameters" if @args % 2; | |||
| 294 | 0 | 0 | my %args = @args; | ||||
| 295 | # warn Dumper(\%args); | ||||||
| 296 | 0 | 0 | 0 | croak "missing report ID (rid)!" if ! defined $args{rid}; | |||
| 297 | |||||||
| 298 | 0 | 0 | my $rows = $self->query( $self->grammar->select_rr_query, [ $args{rid} ] ); | ||||
| 299 | 0 | 0 | foreach ( @$rows ) { | ||||
| 300 | 0 | 0 | 0 | $_->{source_ip} = $self->any_inet_ntop( $_->{source_ip} ) if $self->grammar->language ne 'postgresql'; | |||
| 301 | 0 | 0 | $_->{reasons} = $self->query($self->grammar->select_report_reason, [ $_->{id} ] ); | ||||
| 302 | }; | ||||||
| 303 | return { | ||||||
| 304 | 0 | 0 | cur_page => 1, | ||||
| 305 | total_pages => 1, | ||||||
| 306 | total_rows => scalar @$rows, | ||||||
| 307 | rows => $rows, | ||||||
| 308 | }; | ||||||
| 309 | } | ||||||
| 310 | |||||||
| 311 | sub populate_agg_metadata { | ||||||
| 312 | 6 | 6 | 0 | 21 | my ($self, $agg_ref, $report_ref) = @_; | ||
| 313 | |||||||
| 314 | 6 | 40 | $$agg_ref->metadata->report_id( $$report_ref->{rid} ); | ||||
| 315 | |||||||
| 316 | 6 | 19 | foreach my $f ( qw/ org_name email extra_contact_info / ) { | ||||
| 317 | 18 | 42 | $$agg_ref->metadata->$f( $self->config->{organization}{$f} ); | ||||
| 318 | }; | ||||||
| 319 | 6 | 16 | foreach my $f ( qw/ begin end / ) { | ||||
| 320 | 12 | 30 | $$agg_ref->metadata->$f( $$report_ref->{$f} ); | ||||
| 321 | }; | ||||||
| 322 | |||||||
| 323 | my $errors = $self->query($self->grammar->select_report_error, | ||||||
| 324 | 6 | 21 | [ $$report_ref->{rid} ] | ||||
| 325 | ); | ||||||
| 326 | 6 | 27 | foreach ( @$errors ) { | ||||
| 327 | 0 | 0 | $$agg_ref->metadata->error( $_->{error} ); | ||||
| 328 | }; | ||||||
| 329 | 6 | 19 | return 1; | ||||
| 330 | } | ||||||
| 331 | |||||||
| 332 | sub populate_agg_records { | ||||||
| 333 | 6 | 6 | 0 | 25 | my ($self, $agg_ref, $rid) = @_; | ||
| 334 | |||||||
| 335 | 6 | 18 | my $recs = $self->query( $self->grammar->select_rr_query, [ $rid ] ); | ||||
| 336 | |||||||
| 337 | # aggregate the connections per IP-Disposition-DKIM-SPF uniqueness | ||||||
| 338 | 6 | 64 | my (%ips, %uniq, %pe, %auth, %ident, %reasons, %other); | ||||
| 339 | 6 | 21 | foreach my $rec ( @$recs ) { | ||||
| 340 | 6 | 19 | my $ip = $rec->{source_ip}; | ||||
| 341 | 6 | 50 | 22 | $ip = $self->any_inet_ntop($rec->{source_ip}) if $self->grammar->language ne 'postgresql'; | |||
| 342 | my $key = join('-', $ip, | ||||||
| 343 | 6 | 45 | @$rec{ qw/ disposition dkim spf / }); # hash slice | ||||
| 344 | 6 | 23 | $uniq{ $key }++; | ||||
| 345 | 6 | 16 | $ips{$key} = $rec->{source_ip}; | ||||
| 346 | 6 | 33 | 45 | $ident{$key}{header_from} ||= $rec->{header_from}; | |||
| 347 | 6 | 33 | 39 | $ident{$key}{envelope_from} ||= $rec->{envelope_from}; | |||
| 348 | 6 | 33 | 33 | $ident{$key}{envelope_to} ||= $rec->{envelope_to}; | |||
| 349 | |||||||
| 350 | 6 | 33 | 36 | $pe{$key}{disposition} ||= $rec->{disposition}; | |||
| 351 | 6 | 33 | 31 | $pe{$key}{dkim} ||= $rec->{dkim}; | |||
| 352 | 6 | 33 | 31 | $pe{$key}{spf} ||= $rec->{spf}; | |||
| 353 | |||||||
| 354 | 6 | 33 | 41 | $auth{$key}{spf} ||= $self->get_row_spf($rec->{id}); | |||
| 355 | 6 | 33 | 46 | $auth{$key}{dkim} ||= $self->get_row_dkim($rec->{id}); | |||
| 356 | |||||||
| 357 | 6 | 28 | my $reasons = $self->get_row_reason( $rec->{id} ); | ||||
| 358 | 6 | 28 | foreach my $reason ( @$reasons ) { | ||||
| 359 | 12 | 50 | 26 | my $type = $reason->{type} or next; | |||
| 360 | 12 | 39 | $reasons{$key}{$type} = $reason->{comment}; # flatten reasons | ||||
| 361 | } | ||||||
| 362 | } | ||||||
| 363 | |||||||
| 364 | 6 | 21 | foreach my $u ( keys %uniq ) { | ||||
| 365 | my $record = Mail::DMARC::Report::Aggregate::Record->new( | ||||||
| 366 | identifiers => $ident{$u}, | ||||||
| 367 | auth_results => $auth{$u}, | ||||||
| 368 | row => { | ||||||
| 369 | source_ip => $self->grammar->language eq 'postgresql' ? $ips{$u} : $self->any_inet_ntop( $ips{$u} ), | ||||||
| 370 | count => $uniq{ $u }, | ||||||
| 371 | policy_evaluated => { | ||||||
| 372 | 6 | 85 | %{ $pe{$u} }, | ||||
| 373 | 6 | 50 | 31 | $reasons{$u} ? ( reason => [ map { { type => $_, comment => $reasons{$u}{$_} } } sort keys %{ $reasons{$u} } ] ) : (), | |||
| 12 | 100 | 48 | |||||
| 2 | 17 | ||||||
| 374 | }, | ||||||
| 375 | } | ||||||
| 376 | ); | ||||||
| 377 | 6 | 48 | $$agg_ref->record( $record ); | ||||
| 378 | } | ||||||
| 379 | 6 | 21 | return $$agg_ref->record; | ||||
| 380 | } | ||||||
| 381 | |||||||
| 382 | sub row_exists { | ||||||
| 383 | 5 | 5 | 0 | 18 | my ($self, $rid, $rec ) = @_; | ||
| 384 | |||||||
| 385 | 5 | 50 | 53 | if ( ! defined $rec->{row}{count} ) { | |||
| 386 | 5 | 50 | 38 | print "new record\n" if $self->verbose; | |||
| 387 | 5 | 26 | return; | ||||
| 388 | }; | ||||||
| 389 | |||||||
| 390 | my $rows = $self->query( | ||||||
| 391 | $self->grammar->select_report_record, | ||||||
| 392 | 0 | 0 | [ $rid, $rec->{row}{source_ip}, $rec->{row}{count}, ] | ||||
| 393 | ); | ||||||
| 394 | |||||||
| 395 | 0 | 0 | 0 | return 1 if scalar @$rows; | |||
| 396 | 0 | 0 | return; | ||||
| 397 | } | ||||||
| 398 | |||||||
| 399 | sub insert_agg_record { | ||||||
| 400 | 5 | 5 | 0 | 22 | my ($self, $row_id, $rec) = @_; | ||
| 401 | |||||||
| 402 | 5 | 50 | 29 | return 1 if $self->row_exists( $row_id, $rec); | |||
| 403 | |||||||
| 404 | 5 | 50 | 29 | $row_id = $self->insert_rr( $row_id, $rec ) | |||
| 405 | or croak "failed to insert report row"; | ||||||
| 406 | |||||||
| 407 | 5 | 28 | my $reasons = $rec->row->policy_evaluated->reason; | ||||
| 408 | 5 | 50 | 48 | if ( $reasons ) { | |||
| 409 | 5 | 21 | foreach my $reason ( @$reasons ) { | ||||
| 410 | 2 | 50 | 33 | 29 | next if !$reason || !$reason->{type}; | ||
| 411 | 2 | 13 | $self->insert_rr_reason( $row_id, $reason->{type}, $reason->{comment} ); | ||||
| 412 | }; | ||||||
| 413 | } | ||||||
| 414 | |||||||
| 415 | 5 | 60 | my $spf_ref = $rec->auth_results->spf; | ||||
| 416 | 5 | 50 | 20 | if ( $spf_ref ) { | |||
| 417 | 5 | 18 | foreach my $spf (@$spf_ref) { | ||||
| 418 | 10 | 61 | $self->insert_rr_spf( $row_id, $spf ); | ||||
| 419 | } | ||||||
| 420 | } | ||||||
| 421 | |||||||
| 422 | 5 | 44 | my $dkim = $rec->auth_results->dkim; | ||||
| 423 | 5 | 50 | 23 | if ($dkim) { | |||
| 424 | 5 | 22 | foreach my $sig (@$dkim) { | ||||
| 425 | 5 | 50 | 33 | 46 | next if ! $sig || ! $sig->{domain}; | ||
| 426 | 5 | 31 | $self->insert_rr_dkim( $row_id, $sig ); | ||||
| 427 | } | ||||||
| 428 | } | ||||||
| 429 | 5 | 25 | return 1; | ||||
| 430 | } | ||||||
| 431 | |||||||
| 432 | sub insert_error { | ||||||
| 433 | 1 | 1 | 0 | 4 | my ( $self, $rid, $error ) = @_; | ||
| 434 | # wait >5m before trying to deliver this report again | ||||||
| 435 | 1 | 6 | $self->query($self->grammar->insert_error(0), [$self->time + (5*60), $rid]); | ||||
| 436 | |||||||
| 437 | 1 | 6 | return $self->query( | ||||
| 438 | $self->grammar->insert_error(1), | ||||||
| 439 | [ $rid, $error ] | ||||||
| 440 | ); | ||||||
| 441 | } | ||||||
| 442 | |||||||
| 443 | sub insert_rr_reason { | ||||||
| 444 | 8 | 8 | 0 | 6195 | my ( $self, $row_id, $type, $comment ) = @_; | ||
| 445 | 8 | 100 | 35 | return $self->query( | |||
| 446 | $self->grammar->insert_rr_reason, | ||||||
| 447 | [ $row_id, $type, ($comment || '') ] | ||||||
| 448 | ); | ||||||
| 449 | } | ||||||
| 450 | |||||||
| 451 | sub insert_rr_dkim { | ||||||
| 452 | 8 | 8 | 0 | 1454 | my ( $self, $row_id, $dkim ) = @_; | ||
| 453 | 8 | 20 | my (@fields, @values); | ||||
| 454 | 8 | 25 | foreach ( qw/ domain selector result human_result / ) { | ||||
| 455 | 32 | 100 | 96 | next if ! defined $dkim->{$_}; | |||
| 456 | 30 | 100 | 70 | if ( 'domain' eq $_ ) { | |||
| 457 | 8 | 24 | push @fields, 'domain_id'; | ||||
| 458 | 8 | 35 | push @values, $self->get_domain_id( $dkim->{domain} ); | ||||
| 459 | 8 | 29 | next; | ||||
| 460 | }; | ||||||
| 461 | 22 | 45 | push @fields, $_; | ||||
| 462 | 22 | 76 | push @values, $dkim->{$_}; | ||||
| 463 | }; | ||||||
| 464 | 8 | 24 | my $query = $self->grammar->insert_rr_dkim(\@fields); | ||||
| 465 | 8 | 41 | $self->query( $query, [ $row_id, @values ] ); | ||||
| 466 | 8 | 256 | return 1; | ||||
| 467 | } | ||||||
| 468 | |||||||
| 469 | sub insert_rr_spf { | ||||||
| 470 | 13 | 13 | 0 | 4768 | my ( $self, $row_id, $spf ) = @_; | ||
| 471 | 13 | 42 | my (@fields, @values); | ||||
| 472 | 13 | 46 | for ( qw/ domain scope result / ) { | ||||
| 473 | 39 | 50 | 172 | next if ! defined $spf->{$_}; | |||
| 474 | 39 | 100 | 117 | if ( 'domain' eq $_ ) { | |||
| 475 | 13 | 41 | push @fields, 'domain_id'; | ||||
| 476 | 13 | 60 | push @values, $self->get_domain_id( $spf->{domain} ); | ||||
| 477 | 13 | 304 | next; | ||||
| 478 | }; | ||||||
| 479 | 26 | 63 | push @fields, $_; | ||||
| 480 | 26 | 94 | push @values, $spf->{$_}; | ||||
| 481 | }; | ||||||
| 482 | 13 | 53 | my $query = $self->grammar->insert_rr_spf(\@fields); | ||||
| 483 | 13 | 84 | $self->query( $query, [ $row_id, @values ]); | ||||
| 484 | 13 | 366 | return 1; | ||||
| 485 | } | ||||||
| 486 | |||||||
| 487 | sub insert_rr { | ||||||
| 488 | 6 | 6 | 0 | 32 | my ( $self, $report_id, $rec ) = @_; | ||
| 489 | 6 | 50 | 24 | $report_id or croak "report ID required?!"; | |||
| 490 | 6 | 24 | my $query = $self->grammar->insert_rr; | ||||
| 491 | |||||||
| 492 | 6 | 51 | my $ip = $rec->row->source_ip; | ||||
| 493 | 6 | 50 | 28 | $ip = $self->any_inet_pton( $ip ) if $self->grammar->language ne 'postgresql'; | |||
| 494 | my @args = ( $report_id, | ||||||
| 495 | $ip, | ||||||
| 496 | $rec->{row}{count}, | ||||||
| 497 | 6 | 39 | ); | ||||
| 498 | 6 | 24 | foreach my $f ( qw/ header_from envelope_to envelope_from / ) { | ||||
| 499 | 18 | 50 | 167 | push @args, $rec->identifiers->$f ? | |||
| 500 | $self->get_domain_id( $rec->identifiers->$f ) : undef; | ||||||
| 501 | }; | ||||||
| 502 | 6 | 51 | push @args, map { $rec->row->policy_evaluated->$_ } qw/ disposition dkim spf /; | ||||
| 18 | 63 | ||||||
| 503 | 6 | 50 | 33 | my $rr_id = $self->query( $query, \@args ) or croak; | |||
| 504 | 6 | 172 | return $self->{report_row_id} = $rr_id; | ||||
| 505 | } | ||||||
| 506 | |||||||
| 507 | sub insert_policy_published { | ||||||
| 508 | 10 | 10 | 0 | 39 | my ( $self, $id, $pub ) = @_; | ||
| 509 | 10 | 47 | my $query = $self->grammar->insert_policy_published; | ||||
| 510 | $self->query( $query, | ||||||
| 511 | 10 | 109 | [ $id, @$pub{ qw/ adkim aspf p sp pct rua /} ] | ||||
| 512 | ); | ||||||
| 513 | 10 | 257 | return 1; | ||||
| 514 | } | ||||||
| 515 | |||||||
| 516 | sub db_connect { | ||||||
| 517 | 631 | 631 | 0 | 1867 | my $self = shift; | ||
| 518 | |||||||
| 519 | 631 | 50 | 2033 | my $dsn = $self->config->{report_store}{dsn} or croak; | |||
| 520 | 631 | 9209 | my $user = $self->config->{report_store}{user}; | ||||
| 521 | 631 | 1356 | my $pass = $self->config->{report_store}{pass}; | ||||
| 522 | |||||||
| 523 | # cacheing | ||||||
| 524 | 631 | 50 | 66 | 2868 | if ($self->{grammar} && $self->{dbix}) { | ||
| 525 | 621 | 1801 | my $cached_grammar_type = $self->{grammar}->dsn; | ||||
| 526 | 621 | 50 | 3192 | if ( $dsn =~ /$cached_grammar_type/ ) { | |||
| 527 | 621 | 1940 | return $self->{dbix}; # caching | ||||
| 528 | } | ||||||
| 529 | } | ||||||
| 530 | |||||||
| 531 | 10 | 19 | my $needs_tables; | ||||
| 532 | |||||||
| 533 | 10 | 21 | $self->{grammar} = undef; | ||||
| 534 | 10 | 50 | 62 | if ($dsn =~ /sqlite/i) { | |||
| 0 | |||||||
| 0 | |||||||
| 535 | 10 | 100 | my ($db) = ( split /=/, $dsn )[-1]; | ||||
| 536 | 10 | 100 | 33 | 391 | if ( !$db || $db eq ':memory:' || !-e $db ) { | ||
| 66 | |||||||
| 537 | 6 | 21 | my $schema = 'mail_dmarc_schema.sqlite'; | ||||
| 538 | 6 | 50 | 23 | $needs_tables = $self->get_db_schema($schema) | |||
| 539 | or croak | ||||||
| 540 | "can't locate DB $db AND can't find $schema! Create $db manually.\n"; | ||||||
| 541 | } | ||||||
| 542 | 10 | 135 | $self->{grammar} = Mail::DMARC::Report::Store::SQL::Grammars::SQLite->new(); | ||||
| 543 | } elsif ($dsn =~ /mysql/i) { | ||||||
| 544 | 0 | 0 | $self->{grammar} = Mail::DMARC::Report::Store::SQL::Grammars::MySQL->new(); | ||||
| 545 | } elsif ($dsn =~ /pg/i) { | ||||||
| 546 | 0 | 0 | $self->{grammar} = Mail::DMARC::Report::Store::SQL::Grammars::PostgreSQL->new(); | ||||
| 547 | } else { | ||||||
| 548 | 0 | 0 | croak "can't determine database type, so unable to load grammar.\n"; | ||||
| 549 | } | ||||||
| 550 | |||||||
| 551 | 10 | 50 | 89 | $self->{dbix} = DBIx::Simple->connect( $dsn, $user, $pass ) | |||
| 552 | or return $self->error( DBIx::Simple->error ); | ||||||
| 553 | |||||||
| 554 | 10 | 100 | 31402 | if ($needs_tables) { | |||
| 555 | 6 | 20 | $self->apply_db_schema($needs_tables); | ||||
| 556 | } | ||||||
| 557 | |||||||
| 558 | 10 | 49 | return $self->{dbix}; | ||||
| 559 | } | ||||||
| 560 | |||||||
| 561 | sub db_check_err { | ||||||
| 562 | 250 | 250 | 0 | 38128 | my ( $self, $err ) = @_; | ||
| 563 | ## no critic (PackageVars) | ||||||
| 564 | 250 | 100 | 2666 | return if !defined $DBI::errstr; | |||
| 565 | 1 | 50 | 8 | return if !$DBI::errstr; | |||
| 566 | 1 | 50 | 6 | return if $DBI::errstr eq 'DBI error: '; | |||
| 567 | 1 | 99 | croak $err . $DBI::errstr; | ||||
| 568 | } | ||||||
| 569 | |||||||
| 570 | 722 | 50 | 722 | 0 | 4770 | sub dbix { return $_[0]->{dbix} if $_[0]->{dbix}; return $_[0]->db_connect(); } | |
| 0 | 0 | ||||||
| 571 | |||||||
| 572 | sub apply_db_schema { | ||||||
| 573 | 6 | 6 | 0 | 18 | my ( $self, $file ) = @_; | ||
| 574 | 6 | 45 | my $setup = $self->slurp($file); | ||||
| 575 | 6 | 139 | foreach ( split /;/, $setup ) { | ||||
| 576 | # warn "$_\n"; | ||||||
| 577 | 336 | 3363302 | $self->dbix->query($_); | ||||
| 578 | } | ||||||
| 579 | 6 | 1765 | return; | ||||
| 580 | } | ||||||
| 581 | |||||||
| 582 | sub get_db_schema { | ||||||
| 583 | 6 | 6 | 0 | 14 | my ( $self, $file ) = @_; | ||
| 584 | 6 | 50 | 164 | return "share/$file" if -f "share/$file"; # when testing | |||
| 585 | 0 | 0 | return File::ShareDir::dist_file( 'Mail-DMARC', $file ); # when installed | ||||
| 586 | } | ||||||
| 587 | |||||||
| 588 | sub query { | ||||||
| 589 | 290 | 290 | 0 | 3214 | my ( $self, $query, $params, @extra ) = @_; | ||
| 590 | |||||||
| 591 | 290 | 1019 | my @c = caller; | ||||
| 592 | 290 | 1968 | my $err = sprintf( "query called by %s, %s\n", $c[0], $c[2] ) | ||||
| 593 | . "\t$query\n\t"; | ||||||
| 594 | |||||||
| 595 | 290 | 493 | my @params; | ||||
| 596 | 290 | 100 | 800 | if ( defined $params ) { | |||
| 597 | 246 | 50 | 975 | @params = ref $params eq 'ARRAY' ? @$params : $params; | |||
| 598 | 4 | 4 | 48 | no warnings; ## no critic (NoWarnings) | |||
| 4 | 9 | ||||||
| 4 | 3381 | ||||||
| 599 | 246 | 899 | $err .= join( ', ', @params ); | ||||
| 600 | } | ||||||
| 601 | |||||||
| 602 | 290 | 50 | 724 | croak "too many arguments to exec_query!" if @extra; | |||
| 603 | |||||||
| 604 | 290 | 50 | 620 | my $dbix = $self->db_connect() or croak DBIx::Simple->error; | |||
| 605 | |||||||
| 606 | 290 | 100 | 1599 | return $self->query_insert( $query, $err, @params ) if $query =~ /^INSERT/ix; | |||
| 607 | 196 | 100 | 738 | return $self->query_replace( $query, $err, @params ) if $query =~ /^(?:REPLACE|UPDATE)/ix; | |||
| 608 | 194 | 100 | 757 | return $self->query_delete( $query, $err, @params ) if $query =~ /^(?:DELETE|TRUNCATE)/ix; | |||
| 609 | 145 | 425 | return $self->query_any( $query, $err, @params ); | ||||
| 610 | } | ||||||
| 611 | |||||||
| 612 | sub query_any { | ||||||
| 613 | 145 | 145 | 0 | 523 | my ( $self, $query, $err, @params ) = @_; | ||
| 614 | # warn "query: $query\n" . join(", ", @params) . "\n"; | ||||||
| 615 | 145 | 248 | my $r; | ||||
| 616 | 145 | 100 | 251 | eval { $r = $self->dbix->query( $query, @params )->hashes; } or print ''; | |||
| 145 | 368 | ||||||
| 617 | 145 | 43612 | $self->db_check_err($err); | ||||
| 618 | 144 | 50 | 450 | die "something went wrong with: $err\n" if ! $r; ## no critic (Carp) | |||
| 619 | 144 | 664 | return $r; | ||||
| 620 | } | ||||||
| 621 | |||||||
| 622 | sub query_insert { | ||||||
| 623 | 94 | 94 | 0 | 407 | my ( $self, $query, $err, @params ) = @_; | ||
| 624 | 94 | 100 | 178 | eval { $self->dbix->query( $query, @params ) } or do { | |||
| 94 | 255 | ||||||
| 625 | 2 | 1386 | warn DBIx::Simple->error . "\n"; | ||||
| 626 | 2 | 372 | croak $err; | ||||
| 627 | }; | ||||||
| 628 | 92 | 1205058 | $self->db_check_err($err); | ||||
| 629 | |||||||
| 630 | # If the table has no autoincrement field, last_insert_id is zero | ||||||
| 631 | 92 | 1433 | my ( undef, undef, $table ) = split /\s+/, $query; | ||||
| 632 | 92 | 50 | 690 | ($table) = split( /\(/, $table ) if $table =~ /\(/; | |||
| 633 | 92 | 646 | $table =~ s/^"|"$//g; | ||||
| 634 | 92 | 50 | 392 | croak "unable to determine table in query: $query" if !$table; | |||
| 635 | 92 | 384 | return $self->dbix->last_insert_id( undef, undef, $table, undef ); | ||||
| 636 | } | ||||||
| 637 | |||||||
| 638 | sub query_replace { | ||||||
| 639 | 2 | 2 | 0 | 13 | my ( $self, $query, $err, @params ) = @_; | ||
| 640 | 2 | 50 | 8 | $self->dbix->query( $query, @params ) or croak $err; | |||
| 641 | 1 | 14571 | $self->db_check_err($err); | ||||
| 642 | 1 | 6 | return 1; # sorry, no indication of success | ||||
| 643 | } | ||||||
| 644 | |||||||
| 645 | sub query_delete { | ||||||
| 646 | 49 | 49 | 0 | 133 | my ( $self, $query, $err, @params ) = @_; | ||
| 647 | 49 | 100 | 109 | my $affected = $self->dbix->query( $query, @params )->rows or croak $err; | |||
| 648 | 12 | 161377 | $self->db_check_err($err); | ||||
| 649 | 12 | 123 | return $affected; | ||||
| 650 | } | ||||||
| 651 | |||||||
| 652 | sub get_row_spf { | ||||||
| 653 | 7 | 7 | 0 | 548 | my ($self, $rowid) = @_; | ||
| 654 | 7 | 23 | return $self->query( $self->grammar->select_row_spf, [ $rowid ] ); | ||||
| 655 | } | ||||||
| 656 | |||||||
| 657 | sub get_row_dkim { | ||||||
| 658 | 7 | 7 | 0 | 481 | my ($self, $rowid) = @_; | ||
| 659 | 7 | 20 | return $self->query( $self->grammar->select_row_dkim, [ $rowid ] ); | ||||
| 660 | } | ||||||
| 661 | |||||||
| 662 | sub get_row_reason { | ||||||
| 663 | 7 | 7 | 0 | 597 | my ($self, $rowid) = @_; | ||
| 664 | 7 | 22 | return $self->query( $self->grammar->select_row_reason, [ $rowid ] ); | ||||
| 665 | } | ||||||
| 666 | |||||||
| 667 | sub grammar { | ||||||
| 668 | 335 | 335 | 0 | 22907 | my $self = shift; | ||
| 669 | 335 | 1062 | $self->db_connect(); | ||||
| 670 | 335 | 1577 | return $self->{grammar}; | ||||
| 671 | } | ||||||
| 672 | |||||||
| 673 | 1; | ||||||
| 674 | |||||||
| 675 | __END__ |