File Coverage

blib/lib/BerkeleyDB/SecIndices/Accessor.pm
Criterion Covered Total %
statement 13 15 86.6
branch n/a
condition n/a
subroutine 5 5 100.0
pod n/a
total 18 20 90.0


line stmt bran cond sub pod time code
1             package BerkeleyDB::SecIndices::Accessor;
2              
3 1     1   56337 use 5.005;
  1         4  
  1         37  
4 1     1   5 use strict;
  1         3  
  1         34  
5 1     1   4 use warnings;
  1         7  
  1         148  
6              
7             require Exporter;
8              
9             our @ISA = qw(Exporter);
10              
11             our %EXPORT_TAGS = ( 'const' => [ qw(ELCK EPUT EDEL EUPD EGET EGTS
12             EDUP EEPT ELOCK TRUE
13             ) ] );
14              
15             our @EXPORT_OK = ( @{ $EXPORT_TAGS{'const'} } );
16              
17             our @EXPORT = qw(EGET EGTS EEPT EPUT ELCK EUPD);
18              
19             our $VERSION = '0.06';
20              
21 1     1   5 use Carp qw(croak);
  1         2  
  1         72  
22 1     1   9045 use BerkeleyDB;
  0            
  0            
23             use File::Spec ();
24             use Storable qw(freeze thaw);
25             local $Storable::canonical = 1;
26              
27             # global container to hold _ALL_ related stuff
28             my $STUBS = {};
29              
30             # max retry times to obtain a cdb lock
31             our $LOCK_RETRY_MAX = 5;
32              
33             # path to database configuration file
34             # of YAML format
35             our $CONFIG;
36             my $_config;
37              
38             # configurable callbacks
39             # refer to BerkeleyDB on how to write
40             our $CB_EXTRACT_SECKEY;
41             our $CB_DUP;
42             our $CB_DUPSORT;
43              
44             # debug flag
45             # no debug ouput by default
46             our $DEBUG = 0;
47              
48             sub ELCK() { -2; }
49             sub ELOCK() { ELCK(); }
50             sub EPUT() { -1; }
51             sub EDEL() { -1; }
52             sub EUPD() { -1; }
53             sub EGET() { -1; }
54             sub EGTS() { -1; }
55             sub EDUP() { -1; }
56             sub EEPT() { -3; }
57             sub TRUE() { 1; }
58              
59             BEGIN {
60             if ($BerkeleyDB::db_version < 3.3) {
61             croak("BerkeleyDB ver 3.3.x required!");
62             }
63             }
64              
65             # This section inits _ALL_ databases
66             # _ALL_ db handlers are stored in $db_pool with specific keys
67             # the key is contructed according to keys %{$_config->{DATABASE}}
68              
69             {
70             use YAML ();
71             ( $_config ) = YAML::LoadFile($CONFIG);
72             # soft check
73             croak("no database HOME found") unless
74             exists $_config->{HOME} and -d $_config->{HOME};
75             croak("wrong database declaration") unless
76             ref $_config->{DATABASE} eq 'HASH';
77            
78             # Global Env for _ALL_ DBs
79             # created in system shared memory by flag DB_SYSTEM_MEM
80             # Shared memory key can also be specified in DB_CONFIG
81             my $Env = BerkeleyDB::Env::->new(
82             -Home => $_config->{HOME},
83             #-Cachesize => 2000000,
84             -ErrFile => *STDERR,
85             -ErrPrefix => __PACKAGE__,
86             -Flags => DB_CREATE|DB_INIT_CDB|DB_INIT_MPOOL|DB_SYSTEM_MEM,
87             -Verbose => 1,
88             );
89             croak("cannot create dbenv") unless $Env and $Env->status() == 0;
90             my $DB = $_config->{DATABASE};
91             # container to hold _ALL_ dbs
92             my $db_pool = {};
93             # container to check required index fields for primary database
94             my $required_index = {};
95            
96             # DIRTY CODE START
97             no strict 'refs';
98             my $getenv = '___dbenv';
99            
100             *$getenv = sub() { $Env; };
101            
102             my $makekey = sub {
103             my ( $primary, $index ) = @_;
104             if ($index) {
105             return '_'. lc($primary). '_'. lc($index);
106             }
107             else {
108             return '_'. lc($primary);
109             }
110             };
111            
112             my $_db_property = DB_DUP;
113             if (defined $CB_DUPSORT and ref $CB_DUPSORT eq 'CODE') {
114             $_db_property |= DB_DUPSORT;
115             }
116             foreach my $i (keys %$DB) {
117             if ($i =~ m/INDEX$/o) {
118             # index db
119             # open SUBS
120             foreach my $j (0 .. $#{$DB->{$i}->{SUBS}}) {
121             my $params = {
122             -Filename => $DB->{$i}->{FILE},
123             -Subname => $DB->{$i}->{SUBS}->[$j],
124             -Flags => DB_CREATE|DB_DIRECT_DB,
125             -Property => $_db_property,
126             -Mode => 0644,
127             -Env => $Env,
128             };
129             if (defined $CB_DUP and ref $CB_DUP eq 'CODE') {
130             $params->{-Compare} = $CB_DUP;
131             }
132             if (defined $CB_DUPSORT and
133             ref $CB_DUPSORT eq 'CODE') {
134             $params->{-DupCompare} = $CB_DUPSORT;
135             }
136             my $db = BerkeleyDB::Btree::->new(%$params);
137             my $key = $makekey->($i, $DB->{$i}->{SUBS}->[$j]);
138             #print 'key: ', $key, "\n";
139             #print "CDS: ". $db->cds_enabled() ? 'true' : 'false', "\n";
140             $db_pool->{$key} = $db;
141             *$key = sub() { $db_pool->{$key}; };
142             }
143             }
144             else {
145             my $_type = 0; # Recno by default
146             if (exists $DB->{$i}->{TYPE}) {
147             # honor type
148             #print STDERR $i, ": ", $DB->{$i}->{TYPE}, "\n";
149             if ($DB->{$i}->{TYPE} eq 'Hash') {
150             $_type = 1;
151             }
152             elsif ($DB->{$i}->{TYPE} eq 'Btree') {
153             $_type = 2;
154             }
155             else {
156             $_type = 0;
157             }
158             }
159             my $db;
160             if ($_type == 0) {
161             $db = BerkeleyDB::Recno::->new(
162             -Filename => $DB->{$i}->{FILE},
163             -Flags => DB_CREATE|DB_DIRECT_DB,
164             -Mode => 0644,
165             -Env => $Env,
166             );
167             }
168             elsif ($_type == 1) {
169             $db = BerkeleyDB::Hash::->new(
170             -Filename => $DB->{$i}->{FILE},
171             -Flags => DB_CREATE|DB_DIRECT_DB,
172             -Property => DB_DUP,
173             -Mode => 0644,
174             -Env => $Env,
175             );
176             }
177             else {
178             $db = BerkeleyDB::Btree::->new(
179             -Filename => $DB->{$i}->{FILE},
180             -Flags => DB_CREATE|DB_DIRECT_DB,
181             -Property => DB_DUP,
182             -Mode => 0644,
183             -Env => $Env,
184             );
185             }
186            
187             my $key = $makekey->($i);
188             $db_pool->{$key} = $db;
189             *$key = sub() { $db_pool->{$key}; };
190             }
191             }
192              
193             # associate each index db with primary db
194             foreach my $i (keys %$DB) {
195             if ($i =~ m/INDEX$/o) {
196             # BLAH_INDEX associated with $db_pool->{_blah}
197             foreach my $j (0 .. $#{$DB->{$i}->{SUBS}}) {
198             my $secondary = $makekey->($i, $DB->{$i}->{SUBS}->[$j]);
199             ( my $p = $i ) =~ s/_INDEX$//o;
200             my $primary = $makekey->($p);
201             push @{$required_index->{$p}}, $DB->{$i}->{SUBS}->[$j];
202            
203             unless (exists $db_pool->{$primary}) {
204             croak("database $primary not found");
205             }
206             unless (exists $db_pool->{$secondary}) {
207             croak("database $secondary not found");
208             }
209             my $_extract_seckey = sub {
210             my $pkey = shift;
211             # $pdata is freezed by Storable
212             my $pdata = shift;
213             my $hcontent = thaw($pdata);
214             my $k = $hcontent->{$DB->{$i}->{SUBS}->[$j]};
215             #print STDERR 'secondary key: '. $k, "\n";
216             if (ref $k eq 'ARRAY') {
217             # special index for array
218             # array items should be recno
219             # FOR scenarios and cases
220             my $skey = '';
221             foreach (@$k) {
222             vec($skey, $_, 1) = 1;
223             }
224             $_[0] = $skey;
225             }
226             else {
227             # normal SCALAR
228             $_[0] = $k;
229             }
230             return 0;
231             };
232            
233             my $rc;
234             if (defined $CB_EXTRACT_SECKEY and
235             ref $CB_EXTRACT_SECKEY eq 'CODE') {
236             $rc = $db_pool->{$primary}->associate(
237             $db_pool->{$secondary},
238             $CB_EXTRACT_SECKEY);
239             }
240             else {
241             $rc = $db_pool->{$primary}->associate(
242             $db_pool->{$secondary}, $_extract_seckey);
243             }
244             unless ($rc == 0) {
245             croak("cannot associate index $secondary ".
246             "with primary database $primary");
247             }
248             }
249             }
250             }
251            
252             # make accessor for each primary database
253             foreach my $i (keys %$DB) {
254             if ($i !~ m/INDEX$/o) {
255             # primary database
256             my $put = 'put_'. lc($i);
257             my $put2 = 'put2_'. lc($i);
258             my $upd = 'upd_'. lc($i);
259             # TODO: update version 2
260             my $upd2 = 'upd2_'. lc($i);
261             my $get = 'get_'. lc($i);
262             my $gets = 'get_'. lc($i). 's';
263             my $count = '__'. lc($i). 's';
264             my $dels = 'del_'. lc($i). 's';
265             $STUBS->{$i}->{PUT} = sub { __PACKAGE__->$put(@_); };
266             $STUBS->{$i}->{PUT2} = sub { __PACKAGE__->$put2(@_); };
267             $STUBS->{$i}->{UPD} = sub { __PACKAGE__->$upd(@_); };
268             #$STUBS->{$i}->{UPD2} = sub { __PACKAGE__->$upd2(@_); };
269             $STUBS->{$i}->{GET} = sub { __PACKAGE__->$get(@_); };
270             $STUBS->{$i}->{GETS} = sub { __PACKAGE__->$gets(@_); };
271             $STUBS->{$i}->{COUNT} = sub { __PACKAGE__->$count(@_); };
272             $STUBS->{$i}->{DEL} = sub { __PACKAGE__->$dels(@_); };
273            
274             if (exists $DB->{$i}->{TYPE} and
275             ($DB->{$i}->{TYPE} eq 'Hash' or
276             $DB->{$i}->{TYPE} eq 'Btree')) {
277             *$put = sub {
278             my ( $self, $k, $hcontent ) = @_;
279             croak("ref $_[2] ne 'HASH'") unless
280             ref $hcontent eq 'HASH';
281             # check required index fields
282             foreach my $field (@{$required_index->{$i}}) {
283             if (not exists $hcontent->{$field}) {
284             croak("required index field $field ".
285             "not found");
286             }
287             }
288             my $fcontent = freeze($hcontent);
289             my $key = $makekey->($i);
290             # validate $lock
291             my $lock;
292             my $retry = 0;
293             LOCK:
294             {
295             $lock = $db_pool->{$key}->cds_lock();
296             last LOCK if defined $lock;
297             sleep 3;
298             redo LOCK if ++$retry < $LOCK_RETRY_MAX;
299             }
300             unless (defined $lock) {
301             return ELCK;
302             }
303             my $rc = $db_pool->{$key}->db_put(
304             $k, $fcontent, DB_NOOVERWRITE);
305             # no cache in memo
306             $db_pool->{$key}->db_sync();
307             $lock->cds_unlock();
308             return $rc == 0 ? TRUE : EPUT;
309             };
310            
311             *$put2 = sub {
312             my ( $self, $rpairs ) = @_;
313             return TRUE if keys %$rpairs == 0;
314            
315             croak("ref $_[1] ne 'HASH'") unless
316             ref $rpairs eq 'HASH';
317             # check required index fields
318             foreach my $field (@{$required_index->{$i}}) {
319             my $v;
320             while (( undef, $v ) = each %$rpairs) {
321             if (not exists $v->{$field}) {
322             croak("required index field $field ".
323             "not found");
324             }
325             }
326             }
327             my $key = $makekey->($i);
328             # validate $lock
329             my $lock;
330             my $retry = 0;
331             LOCK:
332             {
333             $lock = $db_pool->{$key}->cds_lock();
334             last LOCK if defined $lock;
335             sleep 3;
336             redo LOCK if ++$retry < $LOCK_RETRY_MAX;
337             }
338             unless (defined $lock) {
339             return ELOCK;
340             }
341            
342             my ( $k, $v, $rc );
343             $rc = 0;
344             while (( $k, $v ) = each %$rpairs) {
345             $rc += $db_pool->{$key}->db_put(
346             $k, freeze($v), DB_NOOVERWRITE);
347             }
348             # no cache in memo
349             $db_pool->{$key}->db_sync();
350             $lock->cds_unlock();
351             return $rc == 0 ? TRUE : EPUT;
352             };
353             }
354             else {
355             # Recno by default
356             *$put = sub {
357             my ( $self, @hcontent ) = @_;
358             croak("ref $_[1] ne 'HASH'") unless
359             ref $hcontent[0] eq 'HASH';
360             # check required index fields
361             foreach my $field (@{$required_index->{$i}}) {
362             if (not exists $hcontent[0]->{$field}) {
363             croak("required index field $field ".
364             "not found");
365             }
366             }
367             my @fcontent = map { freeze($_) } @hcontent;
368             my $key = $makekey->($i);
369             # validate $lock
370             my $lock;
371             my $retry = 0;
372             LOCK:
373             {
374             $lock = $db_pool->{$key}->cds_lock();
375             last LOCK if defined $lock;
376             sleep 3;
377             redo LOCK if ++$retry < $LOCK_RETRY_MAX;
378             }
379             unless (defined $lock) {
380             return ELCK;
381             }
382             my ( $k, $first_key, $rc );
383             $rc = 0;
384             foreach (@fcontent) {
385             $k = -1;
386             $rc += $db_pool->{$key}->db_put(
387             $k, $_, DB_APPEND);
388             $first_key = $k unless defined $first_key;
389             }
390             # no cache in memo
391             $db_pool->{$key}->db_sync();
392             $lock->cds_unlock();
393             return $rc == 0 ? $first_key : EPUT;
394             };
395            
396             *$put2 = sub {
397             my ( $self, $hcontent_array, $key_array ) = @_;
398             return TRUE if @$hcontent_array == 0;
399             croak('ref $_[1]->[0] ne "HASH"') unless
400             ref($hcontent_array->[0]) eq 'HASH';
401             # check required index fields
402             foreach my $field (@{$required_index->{$i}}) {
403             if (not exists $hcontent_array->[0]->{$field}) {
404             croak("required index field $field ".
405             "not found");
406             }
407             }
408             my $key = $makekey->($i);
409             # validate $lock
410             my $lock;
411             my $retry = 0;
412             LOCK:
413             {
414             $lock = $db_pool->{$key}->cds_lock();
415             last LOCK if defined $lock;
416             sleep 3;
417             redo LOCK if ++$retry < $LOCK_RETRY_MAX;
418             }
419             unless (defined $lock) {
420             return ELOCK;
421             }
422             my ( $k, $rc, $rc_all );
423             $rc_all = 0;
424             foreach my $v (@$hcontent_array) {
425             $k = -1;
426             $rc = $db_pool->{$key}->db_put(
427             $k, freeze($v), DB_APPEND);
428             push @$key_array, $k if $rc == 0;
429             $rc_all += $rc;
430             }
431             # no cache in memo
432             $db_pool->{$key}->db_sync();
433             $lock->cds_unlock();
434             return $rc_all == 0 ? TRUE : EPUT;
435             };
436             }
437            
438             *$upd = sub {
439             my ( $self, $k, $hcontent ) = @_;
440             croak("ref $_[2] ne 'HASH'") unless
441             ref $hcontent eq 'HASH';
442             my ( $v, $rc );
443             $v = '';
444             my $key = $makekey->($i);
445             # validate $lock
446             my $lock;
447             my $retry = 0;
448             LOCK:
449             {
450             $lock = $db_pool->{$key}->cds_lock();
451             last LOCK if defined $lock;
452             sleep 3;
453             redo LOCK if ++$retry < $LOCK_RETRY_MAX;
454             }
455             unless (defined $lock) {
456             return ELCK;
457             }
458             # FIXME evaluate $cursor
459             my $cursor = $db_pool->{$key}->db_cursor(DB_WRITECURSOR);
460             $rc = $cursor->c_get($k, $v, DB_SET);
461             #print STDERR "key : $k\n";
462             #print STDERR "rc : $rc\n";
463             return EUPD unless $rc == 0;
464             my $content = thaw($v);
465             foreach (keys %$hcontent) {
466             $content->{$_} = $hcontent->{$_};
467             }
468             my $fcontent = freeze($content);
469             $rc = $cursor->c_put($k, $fcontent, DB_CURRENT);
470             # no cache in memo
471             $db_pool->{$key}->db_sync();
472             $lock->cds_unlock();
473             $cursor->c_close();
474             return $rc == 0 ? 0 : EUPD;
475             };
476            
477             *$get = sub {
478             my ( $self, $k ) = @_;
479             my $key = $makekey->($i);
480             my ( $fcontent, $rc );
481             $rc = $db_pool->{$key}->db_get($k, $fcontent);
482             #print STDERR "rc = ", $rc, "\n";
483             if ($rc == 0) {
484             return thaw($fcontent);
485             }
486             else {
487             if ($rc == DB_NOTFOUND or $rc == DB_KEYEMPTY) {
488             return EEPT;
489             }
490             else {
491             if ($DEBUG) {
492             print STDERR __PACKAGE__, ": get_xxx failed\n";
493             }
494             return EGET;
495             }
496             }
497             };
498              
499             *$gets = sub {
500             my ( $self, $n, $desc, $offset ) = @_;
501             my $key = $makekey->($i);
502             my ( $k, $fcontent, $rc );
503             $k = -1;
504             $desc ||= 0;
505             my $cursor = $db_pool->{$key}->db_cursor();
506             $rc = $cursor->c_get(
507             $k, $fcontent, $desc ? DB_LAST : DB_FIRST);
508             if ($rc == DB_NOTFOUND or $rc == DB_KEYEMPTY) {
509             $cursor->c_close();
510             return [];
511             }
512             if (defined $offset and $offset > 0) {
513             OFFSET:
514             for (my $i = 0; $rc == 0 or $rc == DB_KEYEMPTY;
515             $rc = $cursor->c_get($k, $fcontent,
516             $desc ? DB_PREV :
517             DB_NEXT)
518             ) {
519             last OFFSET if $i == $offset;
520             $i++ if $rc == 0;
521             }
522             }
523             my $ret = [];
524             FETCH:
525             for (my $i = 0; $rc == 0 or $rc == DB_KEYEMPTY;
526             $rc = $cursor->c_get(
527             $k, $fcontent, $desc ? DB_PREV : DB_NEXT)) {
528             last FETCH if $i == $n;
529             if ($rc == 0) {
530             my $entry = {
531             KEY => $k,
532             CONTENT => thaw($fcontent),
533             };
534             push @$ret, $entry;
535             $i++;
536             }
537             }
538             $cursor->c_close();
539             return $ret;
540             };
541            
542             *$count = sub {
543             my $key = $makekey->($i);
544             my $stat = $db_pool->{$key}->db_stat();
545             if (exists $DB->{$i}->{TYPE} and
546             $DB->{$i}->{TYPE} eq 'Hash') {
547             return $stat->{hash_ndata};
548             }
549             else {
550             return $stat->{bt_ndata};
551             }
552             };
553            
554             *$dels = sub {
555             my ( $self, @n ) = @_;
556             return 0 if @n == 0;
557             my $key = $makekey->($i);
558             my $deleted = 0;
559             # validate $lock
560             my $lock;
561             my $retry = 0;
562             LOCK:
563             {
564             $lock = $db_pool->{$key}->cds_lock();
565             last LOCK if defined $lock;
566             sleep 3;
567             redo LOCK if ++$retry < $LOCK_RETRY_MAX;
568             }
569             unless (defined $lock) {
570             return ELCK;
571             }
572             foreach my $recno (@n) {
573             if ($db_pool->{$key}->db_del($recno) == 0) {
574             $deleted++;
575             }
576             }
577             $db_pool->{$key}->db_sync();
578             $lock->cds_unlock();
579             return $deleted;
580             };
581             }
582             else {
583             # index database(s)
584             foreach my $j (0 .. $#{$DB->{$i}->{SUBS}}) {
585             ( my $p = $i ) =~ s/_INDEX$//o;
586             my $get = 'get_'. lc($p).
587             's_by_'. lc($DB->{$i}->{SUBS}->[$j]);
588             my $cat = 'cat_'. lc($i). '_'. lc($DB->{$i}->{SUBS}->[$j]).
589             's';
590             my $count =
591             '__'. lc($i). '_'. lc($DB->{$i}->{SUBS}->[$j]). 's';
592             my $countdup =
593             '__'. lc($i). '_'. lc($DB->{$i}->{SUBS}->[$j]). '_dups';
594            
595             $STUBS->{$p}->{FIELDS}->{lc($DB->{$i}->{SUBS}->[$j])} =
596             sub { __PACKAGE__->$get(@_); };
597             $STUBS->{$i}->{CAT}->{lc($DB->{$i}->{SUBS}->[$j])}
598             = sub { __PACKAGE__->$cat(@_); };
599             $STUBS->{$i}->{COUNT}->{lc($DB->{$i}->{SUBS}->[$j])} =
600             sub { __PACKAGE__->$count(@_); };
601             $STUBS->{$i}->{COUNTDUP}->{lc($DB->{$i}->{SUBS}->[$j])}
602             = sub { __PACKAGE__->$countdup(@_); };
603              
604             *$get = sub {
605             # FIXME ugly api..
606             # TODO hash param
607             my ( $self, $k, $returnValue, $lastone,
608             $n, $offset ) = @_;
609             $returnValue ||= 0;
610             croak("undefined key: $k") unless defined $k;
611             return [] if defined $n and $n <= 0;
612             my $key = $makekey->($i, $DB->{$i}->{SUBS}->[$j]);
613             #print "key = ", $key, "\n";
614             my ( $rc, $pk, $v );
615             my $ret = [];
616            
617             $pk = -1;
618             #$rc = $db_pool->{$key}->db_pget($k, $pk, $v);
619             #if ($rc == DB_NOTFOUND or $rc == DB_KEYEMPTY) {
620             # return $ret;
621             #}
622             my $cursor = $db_pool->{$key}->db_cursor();
623             #print STDERR "status: ", $db_pool->{$key}->status(),
624             # "\n";
625             # set cursor to begin of $k slot
626             $rc = $cursor->c_pget($k, $pk, $v, DB_SET);
627             if ($rc == DB_SECONDARY_BAD) {
628             $cursor->c_close();
629             croak("bad secondary index found");
630             }
631             if ($rc == DB_KEYEMPTY or $rc == DB_NOTFOUND) {
632             $cursor->c_close();
633             return $ret;
634             }
635             my $dup_count = 0;
636             $rc = $cursor->c_count($dup_count);
637             unless ($rc == 0) {
638             $cursor->c_close();
639             croak("cannot count duplicate keys");
640             }
641             # offset out of range
642             return $ret if defined $offset and
643             $offset < 0 and -$offset >= $dup_count;
644            
645             if ($n and defined $offset) {
646             # splice
647             if ($offset >= 0) {
648             if ($offset > 0) {
649             OFFSET:
650             for (my $i = 0; $rc == 0;
651             $rc = $cursor->c_pget(
652             $k, $pk, $v, DB_NEXT_DUP)) {
653             last OFFSET if $i == $offset;
654             $i++;
655             }
656             }
657             my $i = 0;
658             FETCH:
659             {
660             last FETCH if $i == $n;
661             if ($BerkeleyDB::VERSION < 0.29) {
662             # Bug fix for BerkeleyDB v0.27
663             $pk = unpack("L", $pk) - 1;
664             }
665             if ($returnValue) {
666             my $entry = {
667             KEY => $pk,
668             CONTENT => thaw($v),
669             };
670             push @$ret, $entry;
671             } else {
672             push @$ret, $pk;
673             }
674             $i++;
675             $pk = -1;
676             redo FETCH if $cursor->c_pget(
677             $k, $pk, $v, DB_NEXT_DUP) == 0;
678             }
679             }
680             else {
681             # $offset < 0
682             my ( $start_index, $end_index );
683             if ($n >= $dup_count+$offset) {
684             $start_index = 0;
685             $end_index = $dup_count+$offset;
686             }
687             else {
688             $end_index = $dup_count+$offset+1;
689             $start_index = $end_index-$n;
690             }
691             if ($start_index > 0) {
692             OFFSET:
693             for (my $i = 0; $rc == 0;
694             $rc = $cursor->c_pget(
695             $k, $pk, $v, DB_NEXT_DUP)) {
696             last OFFSET if $i++ == $start_index;
697             }
698             }
699             my $i = $start_index;
700             FETCH:
701             {
702             last FETCH if $i > $end_index;
703             if ($BerkeleyDB::VERSION < 0.29) {
704             # Bug fix for BerkeleyDB v0.27
705             $pk = unpack("L", $pk) - 1;
706             }
707             if ($returnValue) {
708             my $entry = {
709             KEY => $pk,
710             CONTENT => thaw($v),
711             };
712             push @$ret, $entry;
713             }
714             else {
715             push @$ret, $pk;
716             }
717             $i++;
718             $pk = -1;
719             redo FETCH if $cursor->c_pget(
720             $k, $pk, $v, DB_NEXT_DUP) == 0;
721             }
722             }
723             return $ret;
724             }
725             else {
726             my $last;
727             FETCH:
728             {
729             if ($BerkeleyDB::VERSION < 0.29) {
730             # Bug fix for BerkeleyDB v0.27
731             $pk = unpack("L", $pk) - 1;
732             }
733             my $entry;
734             if ($returnValue) {
735             $entry = {
736             KEY => $pk,
737             CONTENT => thaw($v),
738             };
739             push @$ret, $entry;
740             } else {
741             push @$ret, $pk;
742             }
743             $last = $entry;
744             $pk = -1;
745             redo FETCH if $cursor->c_pget(
746             $k, $pk, $v, DB_NEXT_DUP) == 0;
747             }
748             $cursor->c_close();
749             if (not $lastone) {
750             return $ret;
751             } else {
752             return [ $last ];
753             }
754             }
755             # NOREACH
756             };
757            
758             *$cat = sub {
759             my ( $self, $value ) = @_;
760             my $key = $makekey->($i, $DB->{$i}->{SUBS}->[$j]);
761             #print STDERR
762             # 'database:'. $DB->{$i}->{SUBS}->[$j]. "\n";
763             my $cursor = $db_pool->{$key}->db_cursor();
764            
765             my $ret = {};
766             my ( $k, $pk, $v );
767             $k = $pk = -1;
768             $v = '';
769             # FIXME db_stat first to get key count
770             while ($cursor->c_pget(
771             $k, $pk, $v, DB_NEXT) == 0) {
772             if ($BerkeleyDB::VERSION < 0.29) {
773             # Bug fix for BerkeleyDB v0.27
774             $pk = unpack("L", $pk) - 1;
775             #print STDERR 'length of key:'. length($k). "\n";
776             }
777             if ($value) {
778             push @{$ret->{$k}}, {
779             KEY => $pk,
780             CONTENT => thaw($v),
781             };
782             }
783             else {
784             push @{$ret->{$k}}, $pk;
785             }
786             $k = $pk = -1;
787             $v = '';
788             }
789             $cursor->c_close();
790             return $ret;
791             };
792            
793             *$count = sub {
794             my $key = $makekey->($i, $DB->{$i}->{SUBS}->[$j]);
795             my $stat = $db_pool->{$key}->db_stat();
796             return $stat->{bt_ndata};
797             };
798              
799             *$countdup = sub {
800             my ( $self, $k) = @_;
801             croak("undefined key: $k") unless defined $k;
802             my $key = $makekey->($i, $DB->{$i}->{SUBS}->[$j]);
803             #print "key = ", $key, "\n";
804             my ( $rc, $pk, $v );
805             my $ret;
806            
807             $pk = -1;
808             #$rc = $db_pool->{$key}->db_pget($k, $pk, $v);
809             #if ($rc == DB_NOTFOUND or $rc == DB_KEYEMPTY) {
810             # return EDUP;
811             #}
812             my $cursor = $db_pool->{$key}->db_cursor();
813             #print STDERR "status: ", $db_pool->{$key}->status(),
814             # "\n";
815             # set cursor to begin of $k slot
816             $rc = $cursor->c_pget($k, $pk, $v, DB_SET);
817             if ($rc == DB_SECONDARY_BAD) {
818             $cursor->c_close();
819             croak("bad secondary index found");
820             }
821             if ($rc == DB_KEYEMPTY or $rc == DB_NOTFOUND) {
822             $cursor->c_close();
823             return 0;
824             }
825             $ret = EDUP;
826             $rc = $cursor->c_count($ret);
827             $cursor->c_close();
828             return $rc == 0 ? $ret : EDUP;
829             };
830            
831             }
832             }
833             }
834             my $stubs = '_stubs';
835             *$stubs = sub() { $STUBS; };
836             use strict 'refs';
837             # DIRTY CODE END
838             }
839              
840             1;
841             __END__