File Coverage

bin/loghack
Criterion Covered Total %
statement 13 15 86.6
branch n/a
condition n/a
subroutine 5 5 100.0
pod n/a
total 18 20 90.0


line stmt bran cond sub pod time code
1             #!/usr/bin/perl
2              
3             # Copyright (C) 2007 Eric L. Wilhelm
4              
5 1     1   1236 use warnings;
  1         2  
  1         31  
6 1     1   6 use strict;
  1         2  
  1         47  
7              
8             =head1 NAME
9              
10             loghack - process and query apache logs
11              
12             =cut
13              
14             package bin::loghack;
15              
16 1     1   4 use ApacheLog::Parser qw(parse_line :fields);
  1         2  
  1         12  
17 1     1   5 use ApacheLog::Parser::SkipList;
  1         2  
  1         26  
18              
19 1     1   434 use Getopt::Helpful;
  0            
  0            
20              
21             use YAML ();
22             use POSIX ();
23             use File::Basename qw(basename dirname);
24             use Digest::MD5 ();
25              
26             use Date::Piece qw(date months weeks);
27              
28             use Carp;
29              
30             BEGIN {
31             unless($ENV{HOSTNAME}) {
32             if(open(my $fh, '<', '/etc/hostname')) {
33             chomp(my $line = <$fh>);
34             $ENV{HOSTNAME} = $line;
35             }
36             }
37             }
38             $SIG{CHLD} = sub {
39             my $child;
40             while(($child = waitpid(-1, POSIX::WNOHANG())) > 0) {
41             # XXX this dies before we get a cluster child's output?
42             if($?) {
43             my $code = $? >> 8;
44             my $sig = $? & 127;
45             die "$child status: $? ($code/$sig)";
46             }
47             }
48             };
49              
50             sub open_file {
51             my ($file) = @_;
52              
53             # better than IO::Zlib because it leverages multiple cores, duh
54             if($file =~ m/\.(gz|bz2)$/) {
55             my $ext = $1;
56             my %prog = (
57             gz => 'gunzip',
58             bz2 => 'bunzip2',
59             );
60             my $prog = $prog{$ext} or die "cannot read $ext files";
61             my $pid = open(my $fh, '-|');
62             unless($pid) {
63             local $SIG{CHLD};
64             open(STDIN, '<', $file) or die "cannot open $file $!";
65             exec($prog, '-c') or die "ack $!";
66             }
67             #warn "launch $prog < $file on $pid\n";
68             return($fh);
69             }
70             else {
71             open(my $fh, '<', $file) or die "cannot open '$file' $!";
72             return($fh);
73             }
74             }
75              
76             sub pipe_out {
77             my ($file) = @_;
78             $file =~ m/\.(gz|bz2)$/ or die "unknown extension on $file";
79             my $ext = $1;
80             my %prog = (
81             gz => 'gzip',
82             bz2 => 'bzip2',
83             );
84             my $prog = $prog{$ext} or die "cannot write $ext files";
85              
86             my $pid = open(my $fh, '|-');
87             unless($pid) {
88             local $SIG{CHLD};
89             open(STDOUT, '>', $file) or die "cannot write '$file' $!";
90             exec($prog, '-c') or die "ack $!";
91             }
92             #warn "launch $prog > $file on $pid\n";
93             return($fh);
94             }
95              
96             sub main {
97             my (@args) = @_;
98              
99             my %o = (
100             archive => '',
101             repository => '',
102             missok => 0,
103             daemon => '',
104             cluster => '',
105             skip => 1,
106             quiet => 0,
107             );
108             my $hopt = Getopt::Helpful->new(
109             usage => 'CALLER [options] ',
110             ['a|archive=s', \$o{archive}, '', 'archive dir'],
111             ['r|repository=s', \$o{repository}, '', 'repository dir'],
112             ['missok', \$o{missok}, '', 'skip missing files'],
113             ['d|daemon=s', \$o{daemon}, '', 'daemon mode - needs chdir'],
114             ['c|cluster=s', \$o{cluster}, '', 'cluster mode'],
115             ['s|skip!', \$o{skip}, '', 'use skipper (default yes)'],
116             ['q|quiet', \$o{quiet}, '', 'suppress status'],
117             '+help',
118             );
119             $hopt->Get_from(\@args);
120              
121             if(not $o{repository}) {
122             $o{repository} = '.' if(-e '.config');
123             }
124              
125             if($o{daemon}) {
126             daemon(\%o, @args);
127             exit;
128             }
129              
130             my %modes = map({$_ => 1} qw(
131             makelinks
132             import
133             prep check sweep verify confirm list
134             unique day_unique month_unique month_unique2
135             compile
136             aggregate report date dump tabulate count reskip
137             ));
138             my $mode = shift(@args);
139             $modes{$mode} or die "USAGE: mode must be one of ",
140             join(", ", sort(keys(%modes))), "\n";
141              
142             # TODO deal with the do_ stuff
143             if($o{cluster}) {
144             cluster(\%o, $mode, @args);
145             }
146             else {
147             my $run = __PACKAGE__->can('do_' . $mode) or
148             die "cannot find method 'do_$mode'";
149             $run->(\%o, @args);
150             }
151             }
152              
153             sub name_as_date {
154             my ($n) = @_;
155             $n =~ s/(?:.*\.)?(\d{4}-\d{2}-\d{2})\..*/$1/ or
156             croak("weird name -- $n");
157             $n =~ s#.*/##;
158             return($n);
159             }
160             sub nice_name {
161             my ($name) = @_;
162             my @d = split(/\/+/, $name);
163             my $n = '*.' . name_as_date(pop(@d)) . '.*';
164             @d or return($n);
165             return(join("/", $d[-1], $n));
166             }
167             sub record_source {
168             my ($opt, $file, $dir, $md5) = @_;
169              
170             my $writefile = $dir.$md5;
171              
172             want_dir($dir);
173              
174             if(-e $writefile) {
175             warn "skipping $writefile ($file)\n";
176             return;
177             }
178             open(my $fh, '>', $writefile) or die "cannot write '$writefile' $!";
179             print $fh File::Basename::basename($file), "\n";
180             close($fh) or die "cannot write '$writefile' $!";
181             }
182             sub want_dir {
183             my ($dir) = @_;
184              
185             return if(-d $dir);
186              
187             unless(mkdir($dir)) {
188             die "cannot create $dir $!" unless(-d $dir);
189             }
190             }
191              
192             sub daemon {
193             my ($opt, @args) = @_;
194              
195             my $dir = $opt->{daemon};
196             chdir($dir) or die "no such dir $dir\n";
197             $| = 1;
198             while(my $line = ) {
199             chomp($line);
200             main(split(/\t/, $line));
201             print ".done\n";
202             #warn "done\n";
203             #die ".done\n";
204             #sleep(1);
205             }
206             }
207              
208             sub start_cluster {
209             my ($dir, @hosts) = @_;
210             require IPC::Open3;
211             require IO::Select;
212             require IO::Handle;
213             my $sel = IO::Select->new();
214             my %track;
215             my $prog = basename($0);
216             foreach my $host (@hosts) {
217             (my $realhost = $host) =~ s/#\d+$//;
218             my $stdin;
219             my ($stdout, $stderr) = map({IO::Handle->new} 1..3);
220             my $pid = IPC::Open3::open3(
221             $stdin, $stdout, $stderr,
222             ($realhost eq 'localhost' ? () : ('ssh', $realhost)),
223             $prog, '-d', $dir
224             );
225             #warn "started $pid to $host";
226             $stdout->autoflush;
227             $stderr->autoflush;
228             $pid or die "gah no pid\n";
229             #warn "$stdin, $stdout, $stderr";
230             $track{$pid} = my $obj = {
231             stdin => $stdin,
232             stdout => $stdout,
233             stderr => $stderr,
234             host => $host,
235             };
236             $sel->add($obj->{sel_o} = [$stdout, $pid, 'stdout']);
237             $sel->add($obj->{sel_e} = [$stderr, $pid, 'stderr']);
238             }
239             return($sel, %track);
240             }
241             my $lglob = sub {
242             my ($opt, @spec) = @_;
243             local $opt->{lazy_glob} = 1;
244             return(repo_files($opt, @spec));
245             };
246             my $datethru = sub {shift(@_); _date_dwim(@_) };
247             my %cluster_fspec = (
248             report => $lglob,
249             compile => $datethru,
250             unique => $lglob,
251             day_unique => $datethru,
252             month_unique => sub {shift(@_); @_},
253             month_unique2 => sub {$_[1]},
254             );
255             sub cluster {
256             my ($opt, $mode, @files) = @_;
257              
258             require Cwd;
259             my $dir = Cwd::abs_path($opt->{repository});
260              
261             if(my $code = $cluster_fspec{$mode}) {
262             @files = $code->($opt, @files);
263             #die join("\n ", 'files', @files);
264             }
265             else {
266             @files = repo_files($opt, @files);
267             foreach my $file (@files) {
268             my $msg = "missing $file\n";
269             (-e $file) or $opt->{missok} ? warn $msg : die $msg;
270             }
271             }
272              
273             my @hosts = map({my ($h, $n) = split(/:/, $_);
274             ($n ? map({$h.'#'.$_} 1..$n) : $h)
275             } split(/, ?/, $opt->{cluster}));
276              
277             if(@hosts > @files) { # XXX weighting?
278             warn "that would get boring\n";
279             $#hosts = $#files;
280             }
281             my ($sel, %track) = start_cluster($dir, @hosts);
282              
283             my $hlen = 0;
284             foreach my $host (@hosts) {
285             my $l = length($host);
286             $hlen = $l if($l > $hlen);
287             }
288              
289             #die map({"$_ => " . join(", ", %{$track{$_}})} keys(%track));
290             my %hmap = map({$track{$_}{host} => $_} keys(%track));
291             my %sels = map({$track{$_->[1]}{host} => $_} $sel->handles);
292              
293             my %blacklist;
294             my $output = sub {
295             my ($host, $which, @lines) = @_;
296             my $pref = ($which eq 'stderr' ? '!' : '#');
297             printf("%-${hlen}s %s %s", $host, $pref, $_) for(@lines);
298             };
299             my $end_host = sub {
300             my ($host) = @_;
301              
302             my $pid = delete($hmap{$host}) or die "no pid at $host";
303             my $obj = delete($track{$pid});
304              
305             warn ' 'x($hlen+1), "closing $host\n";
306             close($obj->{stdin});
307             my $errh = $obj->{stdout};
308             local $SIG{ALRM} = sub { warn "no stderr on $host\n"};
309             alarm(2);
310             $output->($host, 'stderr', <$errh>);
311             alarm(0);
312             $sel->remove(delete($obj->{sel_o})) or die;
313             #$errh->blocking(0);
314             $sel->remove(delete($obj->{sel_e})) or die;
315             };
316             my $fill_host = sub {
317             my ($host) = @_;
318              
319             if($blacklist{$host}) {
320             warn "$host is blacklisted\n";
321             eval { $end_host->($host) };
322             return;
323             }
324              
325             my $pid = $hmap{$host} or die "no pid at $host";
326             my $obj = $track{$pid};
327             my $fh = $obj->{stdin};
328              
329             unless(@files) {
330             $end_host->($host);
331             return;
332             }
333              
334             my $file = shift(@files);
335             #warn "fill $host with $file\n";
336             if($opt->{missok} and not -e $file) {
337             warn "still missing '$file'\n";
338             my @later = ($file);
339             while($file = shift(@files)) {
340             if(-e $file) {
341             push(@files, @later);
342             last;
343             }
344             else {
345             warn "still missing '$file'\n";
346             push(@later, $file);
347             }
348             }
349             $file or die "out of files to use while waiting\n";
350             # grr, needs a loop
351             }
352             #warn "send $host $mode\t$file\n";
353             print $fh "$mode\t$file\n";
354             };
355             local $SIG{CHLD} = sub {
356             my $child;
357             while(($child = waitpid(-1, POSIX::WNOHANG())) > 0) {
358             if($?) {
359             my $code = $? >> 8;
360             my $sig = $? & 127;
361             my $host = $track{$child}{host};
362             warn " error $host ($child) status: $? ($code/$sig)\n";
363             $blacklist{$host} = 1;
364             $end_host->($host);
365             }
366             }
367             };
368              
369              
370             # go!
371             $fill_host->($_) for(@hosts);
372              
373             my %f = (stderr => 0, stdout => 1);
374             while($sel->count) {
375             READ: while(my @ready = $sel->can_read) {
376             @ready = sort({$f{$a->[2]} <=> $f{$b->[2]}} @ready);
377              
378             foreach my $bit (@ready) {
379             my ($fh, $pid, $which) = @$bit;
380             my $obj = $track{$pid};
381             my $host = $obj->{host};
382             $fh->blocking(0);
383             until(eof($fh)) {
384             my $line = <$fh>;
385              
386             # XXX probably never need this bit
387             unless(defined($line)) {
388             warn "undef line from $host\n";
389             $sel->remove($bit);
390             last;
391             }
392             # TODO handle death
393             if(($which eq 'stdout') and ($line =~ m/^.done$/)) {
394             #warn "$host said done\n";
395             $fill_host->($host);
396             last;
397             }
398             $output->($host, $which, $line);
399             }
400             $fh->blocking(1);
401             }
402             }
403             #warn "twiddling\n";
404             }
405             if(@files) {
406             die "ACK all my hosts died! (",
407             scalar(@files), " files left to process.)\n";
408             }
409             }
410              
411             =head2 reskip
412              
413             Regenerate the skiplist for a given chunk.
414              
415             =cut
416              
417             sub do_reskip {
418             my ($opt, @files) = @_;
419              
420             @files = repo_files($opt, @files);
421              
422             my $skipper = get_skipper($opt);
423             my $doskip = $skipper->get_matcher;
424             foreach my $file (@files) {
425             unless(-e $file) {
426             die "no such file:\n $file\n";
427             }
428             my $fh = open_file($file);
429              
430             my $nicename = nice_name($file);
431             my $start = time;
432             print "$nicename -- ",
433             sprintf("%02d:%02d:%02d", (localtime($start))[2,1,0]), "\n";
434              
435             my $skipfile = skipfilename($opt, $file);
436             my $sw = $skipper->new_writer($skipfile);
437             my $lnum = 0;
438             while(my $line = <$fh>) {
439             $lnum++;
440             chomp($line);
441             my @v = split(/\t/, $line);
442             # create skiplist
443             if($doskip and $doskip->(\@v)) {$sw->skip($lnum);}
444             }
445             }
446              
447             }
448              
449             =begin notes
450              
451             The files are split per-hour. Time zone adjustments are going to be an
452             issue. There's also a potential race condition between two nodes, so
453             the outputs will always have a ".$chunk" appended to them. The value of
454             $part is either 0 or 1 (and only switched to 1 at the start of the file.)
455              
456             And another issue: delay. The request init time is what's shown, but
457             it doesn't get logged until the request completes. So a 10min request
458             will not appear until 10min later. If there are any large downloads,
459             they could possibly even span a couple of logrotates.
460              
461             This also means that tomorrow or the next day could concievably hold a
462             bit of data from a big download that started 24+ hours ago. In
463             practice, logrotate is actually just disposing of this data when it runs
464             gzip. That is, a request always goes in the logfile that was open when
465             the apache process spawned?
466              
467             Still need to figure out the cleanup pass. Add the skiplists together
468             (and/or rename them), figure out where to tie-off the last item, etc.
469             Probably need some tracking of sources and/or chunks. Chunks can
470             probably be treated as closed until further notice as long as a
471             chunkcount file is maintained somewhere.
472              
473             =end notes
474              
475             =begin tznotes
476              
477             Probably going to just leave the date string unprocessed (but we will
478             definitely slot it into files according to the adjusted zone.) Of
479             course, the date+hour+tz is used to memoize the outgoing date, so taking
480             the localtime and chunking that back together with the minutes+seconds
481             wouldn't be a big deal. We will need to address the dst issue though.
482              
483             =end tznotes
484              
485             =cut
486              
487             =head2 prep
488              
489             Parse a raw logfile and split it into hourly chunks.
490              
491             loghack prep servername/logfile.gz
492              
493             =cut
494              
495             sub do_prep {
496             my ($opt, @files) = @_;
497              
498             my $repo = $opt->{repository} or
499             die "must have repository setting for prep()\n";
500              
501             my $doskip;
502             my $skipper;
503             if(-e (my $skipconf = "$repo/.config/skips.conf")) {
504             my ($skip) = YAML::LoadFile($skipconf);
505             $skipper = ApacheLog::Parser::SkipList->new();
506             $skipper->set_config($skip);
507             $doskip = $skipper->get_matcher;
508             }
509              
510             my @loaded;
511             foreach my $file (@files) {
512             unless(-e $file) {
513             my $msg = "no such file:\n $file\n";
514             if($opt->{missok}) { warn $msg; next };
515             die $msg;
516             }
517             my $outpath = repository_path($opt, $file);
518             my $fh = open_file($file);
519              
520             my $nicename = nice_name($file);
521              
522             my $checksum = checksum($fh, 50);
523             my $checkfile = "$outpath.loaded/$checksum";
524             my $linecount = 0;
525             my $ch;
526             if(-e "$outpath/.loaded/$checksum") {
527             warn "assume $nicename is done\n";
528             {local $SIG{CHLD}; close($fh);} # stupid macs
529             next;
530             # TODO fast-forward support
531             # $linecount = $old_linecount; and etc
532             }
533             else {
534             record_source($opt, $file, "$outpath.sources/", $checksum);
535              
536             # TODO this could stand to be more atomic
537             { # record results
538             want_dir("$outpath.loaded");
539             my $tag = ($ENV{HOSTNAME} || '') . '.' . $$;
540             open($ch, '>', "$checkfile.$tag") or
541             die "cannot write '$checkfile.$tag' $!";
542             # TODO chmod
543             rename("$checkfile.$tag", $checkfile) or
544             die "cannot make $checkfile $!";
545             }
546              
547             # TODO a replayable pipe would be nice
548             {local $SIG{CHLD}; close($fh);} # stupid macs
549             $fh = open_file($file);
550             }
551              
552             $opt->{quiet} or print "$nicename -- ",
553             sprintf("%02d:%02d:%02d", (localtime)[2,1,0]), "\n";
554              
555             my %outhandles;
556             my $sw;
557             my $out;
558              
559             my $chunk = 2;
560             my $next_chunk = sub {
561             my ($date, $hour, $tz) = @_;
562              
563             # might have already started that chunk
564             if(my $handles = $outhandles{"$date$hour$tz"}) {
565             #warn "back to $date:$hour$tz\n";
566             ($out, $sw) = @$handles;
567             return;
568             }
569              
570             # TODO include timezone in this calc
571             my $datestring = get_datestring($date);
572              
573             # make the tz three digits
574             (my $tzout = $tz) =~ s/00$//;
575             $tzout = '+' . $tzout if(length($tzout) == 2);
576              
577             my $outfile = $outpath . $datestring . ".$hour$tzout.$chunk.tsv.gz";
578             push(@loaded, $outfile);
579             #warn "writing $outfile\n";
580             if(-e $outfile) {
581             # XXX how to decide whether to skip completely?
582             die "already have $outfile\n";
583             }
584             $chunk = 1; # from now on
585             if($skipper) { # TODO how to reset skipcount?
586             my $skipfile = skipfilename($opt, $outfile);
587             $sw = $skipper->new_writer($skipfile);
588             }
589             $out = pipe_out($outfile);
590             print $ch File::Basename::basename($outfile), "\n";
591             $outhandles{"$date$hour$tz"} = [$out, $sw];
592             };
593              
594             my $cdate = '';
595             my %lc;
596             while(my $line = <$fh>) {
597             $linecount++;
598             chomp($line);
599             my $v = parse_line($line);
600              
601             # check date/time
602             my ($d, $h, $rest) = split(/:/, $v->[dtime], 3);
603             my ($tz) = ($rest =~ m/ ([-+]?\d+)/);
604             if("$d$h$tz" ne $cdate) {
605             $next_chunk->($d, $h, $tz);
606             $cdate = "$d$h$tz";
607             $lc{$cdate} ||= 0;
608             #warn "$d $h $tz\n";
609             }
610             my $lnum = ++$lc{$cdate};
611              
612             # create skiplist
613             if($doskip->($v)) {$sw->skip($lnum);}
614              
615             print $out join("\t", @$v), "\n";
616             }
617              
618             print $ch "$linecount\n";
619             close($ch) or die "write '$checkfile' failed $!";
620             # TODO race checks/chmod
621              
622             }
623             wait(); # XXX need this?
624             return(@loaded);
625             }
626              
627             =for doc ###############################################################
628             Examine the */.loaded files and verify that each one has a linecount
629             (finished loading.)
630             loghack check */.loaded/*
631              
632             =cut
633              
634             sub do_check {
635             my ($opt, @files) = @_;
636              
637             foreach my $file (@files) {
638             my $err = run_check($file) or next;
639             print "NC $file (", scalar(@$err), " parts)\n";
640             }
641             }
642             sub do_sweep {
643             my ($opt, @files) = @_;
644              
645             foreach my $file (@files) {
646             my $err = run_check($file) or next;
647             print "NC $file (", scalar(@$err), " parts)\n";
648             foreach my $part (@$err) {
649             print " $part\n";
650             if(-e $part) {
651             unlink($part) or die "cannot unlink('$part') $!";
652             }
653             }
654             unlink($file) or die "cannot unlink('$file') $!";
655             }
656             }
657             sub run_check {
658             my ($checkfile) = @_;
659              
660             die "'$checkfile' is a directory" if(-d $checkfile);
661             open(my $fh, '<', $checkfile) or die "cannot read '$checkfile' $!";
662             my @list = map({chomp; $_} <$fh>);
663              
664             return() if(@list and $list[-1] and $list[-1] =~ m/^\d+$/);
665              
666             my $dir = File::Basename::dirname(File::Basename::dirname($checkfile));
667             return([map({"$dir/$_"} @list)]);
668             }
669              
670             sub _date_dwim {
671             my (@in) = @_;
672              
673             my @dates;
674             while(@in) {
675             my $date = shift(@in);
676              
677             if($date eq 'thru') {
678             push(@dates, date(pop(@dates))->thru(date(shift(@in))));
679             next;
680             }
681             push(@dates, $date);
682             }
683             return(@dates);
684             }
685              
686             =for doc ###############################################################
687             Given a date range, verify that all files + hours for that server are
688             done (with the exception of those listed in the .MIA file.)
689              
690             =cut
691              
692             sub do_verify {
693             my ($opt, @in) = @_;
694              
695             my @dates = _date_dwim(@in) or die "you gave no dates";
696             foreach my $dir (glob('*')) {
697             (-d $dir) or next;
698             foreach my $date (@dates) {
699             my @got = glob("$dir/$date*");
700             print "$dir/$date ", scalar(@got), "\n";
701             }
702             }
703             }
704              
705             =for doc ###############################################################
706             Make sure that all files are claimed somewhere. This is useful when a
707             load-in crashed.
708              
709             loghack confirm *
710              
711             =cut
712              
713             sub do_confirm {
714             my ($opt, @dirs) = @_;
715              
716             foreach my $dir (@dirs) {
717             my %loaded = map({$_ => 1} sub {
718             my ($s_dir) = @_;
719             $s_dir .= '/.loaded';
720             -d $s_dir or return();
721             opendir(my $dh, $s_dir) or die "cannot opendir '$s_dir' $!";
722             my @ans;
723             foreach my $name (grep({$_ !~ m/^\./} readdir($dh))) {
724             my $file = "$s_dir/$name";
725             open(my $fh, '<', $file) or die "cannot read '$file' $!";
726             my @list = map({chomp; $_} <$fh>);
727             pop(@list) if($list[-1] =~ m/^\d+$/);
728             push(@ans, @list);
729             }
730             return(@ans);
731             }->($dir)
732             );
733             $dir =~ s#/*$#/#;
734             opendir(my $dh, $dir) or die "cannot opendir '$dir' $!";
735             foreach my $name (grep({$_ !~ m/^\./} readdir($dh))) {
736             unless($loaded{$name}) {
737             print "$dir$name\n";
738             }
739             }
740             }
741             # TODO exit with error?
742             }
743              
744             =head2 list
745              
746             List files in the repository.
747              
748             loghack list 2008-01-01 thru 2008-01-31 in *
749              
750             =cut
751              
752             sub do_list {
753             my ($opt, @files) = @_;
754              
755             @files = repo_files($opt, @files);
756             print join("\n", @files), "\n";
757             }
758              
759             # TODO repo_hash as groups of 'in' bits
760              
761             # note: also parses stuff like '2007-10-12 in foo bar baz'
762              
763             sub repo_files {
764             my ($opt, @files) = @_;
765              
766             my $repo = $opt->{repository} or die "must have repository";
767              
768             my $spec_re = qr/(?:^\d{4}-\d{2}-\d{2}$)|\*/;
769             return(@files)
770             unless($files[0] =~ $spec_re and not -f $files[0]);
771              
772             my @dates;
773             while(@files) {
774             my $date = shift(@files);
775             if($date eq 'in') {
776             last;
777             }
778             elsif($date eq 'thru') {
779             push(@dates, date(pop(@dates))->thru(date(shift(@files))));
780             next;
781             }
782             $date =~ $spec_re or die "$date doesn't look like a date";
783             push(@dates, $date);
784             }
785              
786             # TODO need an iterator for this
787             my @globs;
788             if(my @dirs = @files) {
789             (-d $_) or die "$_ not a dir" for(@dirs);
790             @globs = map({my $d = $_; map({"$repo/$_/$d.*"} @dirs)} @dates);
791             }
792             else {
793             @globs = map({"$repo/$_"} @dates);
794             }
795             $opt->{lazy_glob} and return(@globs);
796             @files = ();
797             foreach my $glob (@globs) {
798             my @got = glob($glob);
799             (-f $_) or die "$_ is not a file" for(@got);
800             push(@files, @got);
801             }
802             return(@files);
803             }
804              
805             =head2 compile
806              
807             Assemble reports into daily chunks (in the .compiled/ directory.)
808              
809             loghack compile 2007-10-01
810              
811             =cut
812              
813             sub do_compile {
814             my ($opt, @dates) = @_;
815             foreach my $date (@dates) {
816             do_aggregate($opt, 'compile', $date);
817             }
818             }
819              
820             =head2 aggregate
821              
822             Build aggregate reports.
823              
824             loghack aggregate month $start_date
825              
826             loghack aggregate week $start_date
827              
828             =cut
829              
830             sub do_aggregate {
831             my ($opt, @spec) = @_;
832              
833             require ApacheLog::Parser::Report;
834              
835             my ($type, $date) = @spec;
836             my $name;
837             my @files;
838              
839             $opt->{quiet} or
840             printf("$date -- %02d:%02d:%02d\n", (localtime)[2,1,0]);
841             if($type eq 'compile') {
842             @files = repo_files($opt, $date, 'in', glob('*'));
843             want_dir($opt->{repository} . '/.compiled');
844             $name = '.compiled/' . $date;
845             }
846             else {
847             my %disp = (
848             month => sub {shift(@_)->end_of_month},
849             week => sub {shift(@_)+6},
850             );
851             $date = Date::Piece->new($date);
852             my $do = $disp{$type} or
853             die "'$type' must be one of ", join(', ', keys(%disp));
854             @files = map({$opt->{repository} . '/.compiled/' . $_ . '.yml'}
855             $date->thru($do->($date))
856             );
857             want_dir($opt->{repository} . '/.aggregate');
858             $name = ".aggregate/$type.$date";
859             }
860              
861             my $r_config = $opt->{repository} . '/.config/report.conf';
862             (-e $r_config) or die "must have a report config file $r_config";
863             my $rep = ApacheLog::Parser::Report->new(
864             conf => YAML::LoadFile($r_config)
865             );
866              
867             require YAML::Syck;
868              
869             foreach my $file (@files) {
870             my $report_file = ($file =~ m/\.yml$/) ?
871             $file : report_filename($opt, $file);
872             # warn "load $report_file\n";
873             my $data = YAML::Syck::LoadFile($report_file);
874             $rep->aggregate($data);
875             }
876              
877             # and save them
878             my $ag_name = $opt->{repository} . '/' . $name . '.yml';
879              
880             if($type eq 'compile') {
881             $rep->write_report($ag_name);
882             return;
883             }
884              
885             my ($text, $yaml) = $rep->print_report;
886              
887             {
888             open(my $ofh, '>', $ag_name) or
889             die "cannot write $ag_name $!";
890             print $ofh $yaml;
891             }
892             {
893             $ag_name =~ s/\.yml$/.txt/;
894             open(my $ofh, '>', $ag_name) or
895             die "cannot write $ag_name $!";
896             print $ofh $text;
897             }
898             }
899              
900             =head2 tabulate
901              
902             loghack tabulate daily 2007-10-01 thru 2007-10-31
903              
904             =cut
905              
906             sub do_tabulate {
907             my ($opt, @list) = @_;
908              
909             my $daily = ($list[0] eq 'daily') ? shift(@list) : 0;
910              
911             require ApacheLog::Parser::Report;
912              
913             my $outname;
914             my @files;
915             if($daily) {
916             if(@list == 3 and $list[1] eq 'thru') {
917             $outname = join('_', @list[0,2]);
918             $opt->{quiet} or print "$outname\n";
919             }
920             @list = sort(_date_dwim(@list));
921              
922             # TODO something to allow this-month-so-far
923             my %spec = map({
924             $_ => [
925             $opt->{repository} . '/.compiled/' . $_ . '.yml'
926             ]} @list);
927             @files = (\%spec, @list);
928             foreach my $file (map({@{$spec{$_}}} keys %spec)) {
929             (-e $file) or die "no such file: $file\n";
930             }
931             }
932             else {
933             @list = sort(@list);
934              
935             my $dir = $opt->{repository} . '/.aggregate/';
936             @files = map({$dir . $_ . '.yml'} @list);
937             foreach my $file (@files) {
938             (-e $file) or die "no such file: $file\n";
939             }
940             }
941              
942             my $r_config = $opt->{repository} . '/.config/report.conf';
943             (-e $r_config) or die "must have a report config file $r_config";
944             my $rep = ApacheLog::Parser::Report->new(
945             conf => YAML::LoadFile($r_config)
946             );
947              
948             my @table = $rep->table_report(@files);
949             unshift(@table, ['', @list]);
950             if($outname) {
951             my $t_dir = '.tables';
952             want_dir($t_dir);
953             my $file = "$t_dir/$outname.tsv";
954             open(my $fh, '>', $file) or die "cannot write '$file' $!";
955             print $fh join("\n", map({join("\t", @$_)} @table)), "\n";
956             close($fh) or die "cannot write '$file' $!";
957             $opt->{quiet} or print "wrote $file\n";
958             }
959             else {
960             print join("\n", map({join("\t", @$_)} @table)), "\n";
961             }
962             }
963              
964             =head2 report
965              
966             Crunch the prepared data and generate a report for the given chunk(s).
967              
968             loghack report $server/$chunk.tar.gz
969              
970             =cut
971              
972             sub do_report {
973             my ($opt, @files) = @_;
974              
975             require ApacheLog::Parser::Report;
976              
977             my $do_print = !$opt->{quiet};
978             my $show_status = sub {
979             my ($status) = @_;
980             $do_print or return;
981             printf("$status -- %02d:%02d:%02d\n", (localtime)[2,1,0]);
982             };
983             if(@files == 1 and $files[0] =~ m/\*$/) {
984             $show_status->($files[0]);
985             $do_print = 0;
986             }
987             @files = repo_files($opt, @files);
988             my $skipper = get_skipper($opt);
989              
990             my $r_config = $opt->{repository} . '/.config/report.conf';
991             (-e $r_config) or die "must have a report config file $r_config";
992              
993             foreach my $file (@files) {
994             my $fh = open_file($file);
995              
996             my $rep = ApacheLog::Parser::Report->new(
997             conf => YAML::LoadFile($r_config)
998             );
999             my $rfunc = $rep->get_func;
1000              
1001             my $report_file = report_filename($opt, $file);
1002             if(-e $report_file) {
1003             # TODO unless force or check staleness or something
1004             warn "skip (got) $file\n";
1005             next;
1006             }
1007              
1008             $show_status->($file);
1009              
1010             my $sr;
1011             if($skipper) {
1012             my $skipfile = skipfilename($opt, $file);
1013             (-e $skipfile) or die "missing $skipfile";
1014             $sr = $skipper->new_reader($skipfile);
1015             }
1016             my $skip = defined($sr) ? $sr->next_skip : 0;
1017              
1018             my $lnum = 0;
1019             while(my $line = <$fh>) {
1020             $lnum++;
1021             if($lnum == $skip) {
1022             $lnum += $sr->skip_lines($fh);
1023             $skip = $sr->next_skip;
1024             next;
1025             }
1026              
1027             my @v = split(/\t/, $line);
1028             # now for some reporting
1029             $rfunc->(\@v);
1030             #if($lnum > 100000) { warn "exit hack"; last;}
1031             }
1032             $rep->write_report($report_file);
1033             }
1034              
1035             }
1036              
1037             =head2 unique
1038              
1039             Experimental: count/report unique visitors within a chunk.
1040              
1041             =cut
1042              
1043             sub do_unique {
1044             my ($opt, @files) = @_;
1045              
1046             my $do_print = !$opt->{quiet};
1047             my $show_status = sub {
1048             my ($status) = @_;
1049             $do_print or return;
1050             printf("$status -- %02d:%02d:%02d\n", (localtime)[2,1,0]);
1051             };
1052             if(@files == 1 and $files[0] =~ m/\*$/) {
1053             $show_status->($files[0]);
1054             $do_print = 0;
1055             }
1056             @files = repo_files($opt, @files);
1057              
1058             foreach my $file (@files) {
1059             my $fh = open_file($file);
1060             $show_status->($file);
1061              
1062             my %unique;
1063             while(my $line = <$fh>) {
1064             my ($ip, $rest) = split(/\t/, $line);
1065             ($unique{$ip}||= 0)++;
1066             }
1067              
1068             my $u_file = uniques_filename($opt, $file);
1069             want_dir(File::Basename::dirname($u_file));
1070             my $ofh = pipe_out($u_file);
1071             print $ofh map({"$_\t$unique{$_}\n"} sort keys %unique);
1072             }
1073             }
1074              
1075             =head2 day_unique
1076              
1077             Experimental: count/report unique visitors within a day.
1078              
1079             =cut
1080              
1081             sub do_day_unique {
1082             my ($opt, @dates) = @_;
1083              
1084             want_dir($opt->{repository} . '/.day_uniques');
1085             foreach my $date (@dates) {
1086             $opt->{quiet} or
1087             printf("$date -- %02d:%02d:%02d\n", (localtime)[2,1,0]);
1088             my @files = repo_files($opt, $date, 'in', glob('*'));
1089             my %unique;
1090             foreach my $file (@files) {
1091             my $u_file = uniques_filename($opt, $file);
1092             my $fh = open_file($u_file);
1093             while(my $line = <$fh>) {
1094             chomp($line);
1095             my ($ip, $count) = split(/\t/, $line, 2);
1096             ($unique{$ip}||=0)+= $count;
1097             }
1098             }
1099             my $outfile = day_uniques_filename($opt, $date);
1100             my $ofh = pipe_out($outfile);
1101             print $ofh map({"$_\t$unique{$_}\n"} sort keys %unique);
1102             }
1103             }
1104              
1105             # here we run out of memory, so need a piecewise algorithm
1106              
1107             =head2 month_unique
1108              
1109             Experimental: count/report unique visitors within a month.
1110              
1111             =cut
1112              
1113             sub do_month_unique {
1114             my ($opt, $month) = @_;
1115              
1116             my $out_dir = $opt->{repository} . '/.month_uniques';
1117             want_dir($out_dir);
1118             my $source_dir = $opt->{repository} . '/.day_uniques';
1119              
1120             $opt->{quiet} or
1121             printf("$month -- %02d:%02d:%02d\n", (localtime)[2,1,0]);
1122             my @work;
1123             my $fill = sub {
1124             my ($index) = @_;
1125             my $val;
1126             unless(defined($val = readline($work[$index][2]))) {
1127             splice(@work, $index, 1);
1128             return;
1129             }
1130             chomp($val);
1131             @{$work[$index]}[0,1] = split(/\t/, $val, 2);
1132             };
1133             foreach my $file (glob("$source_dir/$month-*.gz")) {
1134             my $fh = open_file($file);
1135             push(@work, ['', 0, $fh]);
1136             $fill->($#work);
1137             }
1138              
1139             my $outfile = $out_dir . '/' . $month . '.gz';
1140             my $ofh = pipe_out($outfile);
1141              
1142             my $ucount = 0;
1143             my $outc = 0;
1144             while(@work) {
1145             (++$ucount % 1_000_000) or do {
1146             $opt->{quiet} or
1147             printf("$ucount ($outc) -- %02d:%02d:%02d\n", (localtime)[2,1,0])
1148             };
1149             my @l = sort({$work[$a][0] cmp $work[$b][0]} 0..$#work);
1150             my @o = (0);
1151             my $ip = $work[$l[0]][0];
1152             for(1..$#l) {
1153             ($ip eq $work[$l[$_]][0]) or last;
1154             push(@o, $_);
1155             }
1156              
1157             my $count = 0;
1158             $count += $work[$l[$_]][1] for(@o);
1159              
1160             # have to be careful about stale indices in removal
1161             $fill->($_) for(sort({$b <=> $a} @l[@o]));
1162             $outc += scalar(@o);
1163             print $ofh "$ip\t$count\n";
1164             }
1165              
1166             # and write the count
1167             {
1168             my $cfile = $out_dir . '/' . $month . '.count';
1169             open(my $out, '>', $cfile) or die "cannot write '$cfile' $!";
1170             print $out $ucount, "\n";
1171             print "$month -- $ucount\n";
1172             }
1173             }
1174              
1175             =head2 month_unique2
1176              
1177             Experimental: count/report unique visitors within a month (alternate,
1178             memory-hungry algorithm.)
1179              
1180             =cut
1181              
1182             sub do_month_unique2 {
1183             my ($opt, $month) = @_;
1184              
1185             my $out_dir = $opt->{repository} . '/.month_uniques';
1186             want_dir($out_dir);
1187             my $source_dir = $opt->{repository} . '/.day_uniques';
1188             my %unique;
1189             my $ucount = 0;
1190             foreach my $file (glob("$source_dir/$month-*.gz")) {
1191             my $fh = open_file($file);
1192             $opt->{quiet} or
1193             printf("$file -- %02d:%02d:%02d\n", (localtime)[2,1,0]);
1194             while(my $line = <$fh>) {
1195             chomp($line);
1196             my ($ip, $count) = split(/\t/, $line, 2);
1197             unless($unique{$ip}) {
1198             $unique{$ip} = 1;
1199             $ucount++;
1200             }
1201             }
1202             }
1203             my $outfile = $out_dir . '/' . $month . '.count';
1204             open(my $ofh, '>', $outfile) or
1205             die "cannot write '$outfile' $!";
1206             print $ofh $ucount, "\n";
1207             print $ucount, "\n";
1208             }
1209              
1210             =head2 makelinks
1211              
1212             Create hardlinks with dated names.
1213              
1214             =cut
1215              
1216             sub do_makelinks { # TODO --delete option?
1217             my ($opt, $ldir, @files) = @_;
1218              
1219             if(-e $ldir) {
1220             (-d $ldir) or die "USAGE: makelinks \n";
1221             }
1222             unless(-d $ldir) {
1223             mkdir($ldir) or die "cannot create $ldir $!";
1224             }
1225              
1226             foreach my $file (@files) {
1227             my $date = get_date($file);
1228              
1229             # XXX bah
1230             my $base = basename($file);
1231             my $dir = basename(dirname($file));
1232             my $dest = (-d "$ldir/$dir" ? "$ldir/$dir" : $ldir);
1233             my $ext;
1234             if($base =~ s/(?:\.\d+)?(\.(?:gz|bz2))?$//) {
1235             $ext = $1 || '';
1236             }
1237             $dest .= '/' . "$base.$date$ext";
1238              
1239             link($file, $dest) or die "cannot create link $!";
1240             }
1241             }
1242              
1243             =head2 import
1244              
1245             Run the prep, report, compile, and aggregate actions (nice for automatic
1246             daily imports.)
1247              
1248             loghack import $file1 $file2 ...
1249              
1250             =cut
1251              
1252             sub do_import {
1253             my ($opt, @files) = @_;
1254              
1255             my @loaded = do_prep($opt, @files);
1256              
1257             @loaded or die "imported nothing";
1258             do_report($opt, @loaded);
1259              
1260             my @dates = do {
1261             my %dates = map({m#.*/(\d{4}-\d{2}-\d{2})\..*#; ($1 => 1)} @loaded);
1262             sort(keys(%dates));
1263             };
1264              
1265             do_compile($opt, @dates);
1266              
1267             my @actions;
1268             foreach my $date (@dates) {
1269             $date = date($date);
1270              
1271             # TODO tabulate daily $date, 'thru', 'latest'
1272              
1273             # note: these triggers might look strange (a day late), but we don't
1274             # know we have all of last week's/month's data until we see some
1275             # trickle of the new week/month in the input -- if logrotate happens
1276             # after midnight, this will be only the first few minutes of the
1277             # week/month (though weekly logrotate with no interim updates will
1278             # definitely delay that on a slow site.)
1279              
1280             # TODO may need to retabulate farther back in slow-site cases
1281             # (except that should trigger by any bit of the trigger date being
1282             # in the new file?)
1283              
1284             if($date->day == 1) {
1285             my $last_month = $date-1*months;
1286             push(@actions, ['aggregate', 'month', $last_month]);
1287             push(@actions, ['tabulate', 'daily',
1288             $last_month, 'thru', $last_month->end_of_month]
1289             );
1290             }
1291             if($date->iso_wday == 1) {
1292             push(@actions, ['aggregate', 'week', $date - 1*weeks]);
1293             }
1294             }
1295             foreach my $action (@actions) {
1296             my @do = @$action;
1297             #warn "run @do\n";
1298             my $method = shift(@do);
1299             my $ref = __PACKAGE__->can('do_'.$method); # XXX go oo already
1300             $ref->($opt, @do);
1301             }
1302             }
1303              
1304             sub repository_path {
1305             my ($opt, $file) = @_;
1306              
1307             my $repo = $opt->{repository} or die "need repository name";
1308             $repo =~ s#/*$##;
1309              
1310             # TODO we need to sort-out this server bit
1311             my $dir = basename(dirname($file));
1312             my $dest = (-d "$repo/$dir" ? "$repo/$dir" : $repo);
1313             return("$dest/");
1314             }
1315             sub skipfilename {
1316             my ($opt, $file) = @_;
1317              
1318             my $repo = $opt->{repository} or die "need repository name";
1319             $repo =~ s#/*$##;
1320              
1321             my $base = basename($file);
1322             my $dest = dirname($file);
1323             $base =~ s/\.(tsv\.gz|gz|bz2)$//;
1324             $dest .= '/.skipdir/';
1325             unless(-d $dest) {
1326             unless(mkdir($dest)) {
1327             die "cannot create $dest dir $!" unless(-d $dest);
1328             }
1329             }
1330             $dest .= $base . '.skip';
1331             return($dest);
1332             }
1333             sub report_filename {
1334             my ($opt, $file) = @_;
1335             my $repo = $opt->{repository} or die "need repository name";
1336             $repo =~ s#/*$##;
1337              
1338             my $base = basename($file);
1339             my $dir = basename(dirname($file));
1340             my $dest = (-d "$repo/$dir" ? "$repo/$dir" : $repo);
1341             $base =~ s/\.(tsv\.gz|gz|bz2)$//;
1342             $dest .= '/.reports/';
1343             unless(-d $dest) {
1344             mkdir($dest) or die "cannot create $dest dir $!";
1345             }
1346             $dest .= $base . '.yml';
1347             return($dest);
1348             }
1349              
1350             sub inner_filename {
1351             my ($opt, $file, $indir, $ext) = @_;
1352             my $repo = $opt->{repository} or die "need repository name";
1353             $repo =~ s#/*$##;
1354              
1355             my $base = basename($file);
1356             my $dir = basename(dirname($file));
1357             my $dest = (-d "$repo/$dir" ? "$repo/$dir" : $repo);
1358             $base =~ s/\.(tsv\.gz|gz|bz2)$//;
1359             $dest .= "/$indir/";
1360             unless(-d $dest) {
1361             mkdir($dest) or die "cannot create $dest dir $!";
1362             }
1363             $dest .= $base . $ext;
1364             return($dest);
1365             }
1366             sub uniques_filename {
1367             my ($opt, $file) = @_;
1368             inner_filename($opt, $file, '.uniques', '.gz');
1369             }
1370             sub day_uniques_filename {
1371             my ($opt, $day) = @_;
1372             # TODO outer_filename ?
1373             $opt->{repository} . '/' . '.day_uniques/' . $day . '.gz';
1374             }
1375              
1376             sub get_skipper {
1377             my ($opt) = @_;
1378              
1379             my $repo = $opt->{repository} or die "need repository";
1380             $opt->{skip} or return();
1381              
1382             my $skipper;
1383             if(-e (my $skipfile = "$repo/.config/skips.conf")) {
1384             my ($skip) = YAML::LoadFile($skipfile);
1385             $skipper = ApacheLog::Parser::SkipList->new();
1386             $skipper->set_config($skip);
1387             }
1388             return($skipper);
1389             }
1390              
1391             =head2 count
1392              
1393             Count the records in a given chunk (accounting for skiplist.)
1394              
1395             =cut
1396              
1397             sub do_count {
1398             my ($opt, @files) = @_;
1399              
1400             @files = repo_files($opt, @files);
1401              
1402             my $skipper = get_skipper($opt);
1403             $skipper or die "you just want cat?";
1404             foreach my $file (@files) {
1405             my $fh = open_file($file);
1406              
1407             my $skipfile = skipfilename($opt, $file);
1408             my $sr = $skipper->new_reader($skipfile);
1409             my $skip = $sr->next_skip;
1410              
1411             my $lnum = 0;
1412             my $real = 0;
1413             while(my $line = <$fh>) {
1414             if(++$lnum == $skip) {
1415             $lnum += $sr->skip_lines($fh);
1416             $skip = $sr->next_skip;
1417             next;
1418             }
1419             $real++;
1420             }
1421             print join("\t", nice_name($file), $real, $lnum), "\n";
1422             }
1423             }
1424              
1425             =head2 dump
1426              
1427             Dump the records in a given chunk (accounting for skiplist.)
1428              
1429             =cut
1430              
1431             sub do_dump {
1432             my ($opt, @files) = @_;
1433              
1434             @files = repo_files($opt, @files);
1435              
1436             my $skipper = get_skipper($opt);
1437             $skipper or die "you just want cat?";
1438             foreach my $file (@files) {
1439             my $fh = open_file($file);
1440              
1441             my $skipfile = skipfilename($opt, $file);
1442             my $sr = $skipper->new_reader($skipfile);
1443             my $skip = $sr->next_skip;
1444              
1445             my $lnum = 0;
1446             while(my $line = <$fh>) {
1447             if(++$lnum == $skip) {
1448             $lnum += $sr->skip_lines($fh);
1449             $skip = $sr->next_skip;
1450             next;
1451             }
1452             print $line;
1453             }
1454             }
1455             }
1456              
1457             =head2 date
1458              
1459             Print a date for the first line in a raw logfile.
1460              
1461             date=$(loghack date logfile.gz)
1462              
1463             =cut
1464              
1465             sub do_date {
1466             my ($opt, $file) = @_;
1467             my $date = get_date($file);
1468             print $date, "\n";
1469             }
1470              
1471             # TODO put this is the Parser module?
1472             {
1473             my @abbr = qw( Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec );
1474             my %months = map({$abbr[$_] => sprintf("%02d", $_ + 1)} 0..11);
1475             sub get_datestring {
1476             my ($date) = @_;
1477             $date =~ s#^(\d+)/(\w+)/(\d+)$#"$3-".$months{$2}."-$1"#e;
1478             return($date);
1479             }
1480             sub get_date {
1481             my ($file) = @_;
1482             my $fh = open_file($file);
1483              
1484             chomp(my $line = <$fh>);
1485             my $date = @{parse_line($line)}[dtime];
1486              
1487             $date =~ s#^(\d+)/(\w+)/(\d+):.*#"$3-".$months{$2}."-$1"#e;
1488             return($date);
1489             }
1490             }
1491              
1492             =begin doc
1493              
1494             =head2 checksum
1495              
1496             Returns an md5 hexdigest of the first $nlines lines of the file (or the
1497             whole thing if $nlines is omitted.)
1498              
1499             my $md5 = checksum($fh, $nlines);
1500              
1501             =end doc
1502              
1503             =cut
1504              
1505             sub checksum {
1506             my ($fh, $num) = @_;
1507             $num ||= 0;
1508              
1509             my $data = '';
1510             my $count = 0;
1511             while(my $line = <$fh>) {
1512             $data .= $line;
1513             (++$count == $num) and last;
1514             }
1515             ($count >= $num) or croak("don't have $num lines to checksum");
1516              
1517             return(Digest::MD5::md5_hex($data));
1518             } # end subroutine checksum definition
1519             ########################################################################
1520              
1521             package main;
1522              
1523             if($0 eq __FILE__) {
1524             bin::loghack::main(@ARGV);
1525             }
1526              
1527             # vi:ts=2:sw=2:et:sta
1528             my $package = 'bin::loghack';