File Coverage

blib/lib/Perinci/Tx/Manager.pm
Criterion Covered Total %
statement 529 627 84.3
branch 241 380 63.4
condition 76 111 68.4
subroutine 60 68 88.2
pod 15 19 78.9
total 921 1205 76.4


line stmt bran cond sub pod time code
1             package Perinci::Tx::Manager;
2              
3             our $DATE = '2017-07-10'; # DATE
4             our $VERSION = '0.57'; # VERSION
5              
6 4     4   127130 use 5.010001;
  4         25  
7 4     4   34 use strict;
  4         12  
  4         156  
8 4     4   34 use warnings;
  4         19  
  4         240  
9 4     4   33 use experimental 'smartmatch';
  4         11  
  4         42  
10 4     4   395 use Log::ger;
  4         12  
  4         43  
11              
12 4     4   13784 use DBI;
  4         78838  
  4         361  
13 4     4   3282 use File::Flock::Retry;
  4         3395  
  4         157  
14 4     4   538 use File::Remove qw(remove);
  4         1842  
  4         298  
15 4     4   2713 use JSON::MaybeXS;
  4         24821  
  4         343  
16 4     4   59 use Perinci::Sub::Util qw(err);
  4         12  
  4         242  
17 4     4   32 use Scalar::Util qw(blessed);
  4         14  
  4         202  
18 4     4   30 use Package::MoreUtil qw(package_exists);
  4         12  
  4         203  
19 4     4   2707 use Time::HiRes qw(time);
  4         5710  
  4         24  
20 4     4   2785 use UUID::Random;
  4         944  
  4         34693  
21              
22             # patch, add special action to just retrieve code and meta
23             require Perinci::Access::Schemeless;
24             package
25             Perinci::Access::Schemeless;
26              
27             sub actionmeta_get_code_and_meta { +{
28 675     675 0 154583 applies_to => ['function'],
29             summary => "Get code and metadata",
30             } }
31              
32             sub action_get_code_and_meta {
33 666     666 0 142575 my ($self, $req) = @_;
34              
35 666         2028 my $res;
36              
37 666         8306 $res = $self->get_code($req);
38 666 50       253184 return $res if $res;
39              
40 666         5092 $res = $self->get_meta($req);
41 666 50       90348 return $res if $res;
42              
43 666         20048 [200, "OK", [$req->{-code}, $req->{-meta}]];
44             }
45              
46             package Perinci::Tx::Manager;
47              
48             my $proto_v = 2;
49              
50             our $ep = ""; # error prefix
51             our $lp = "[tm]"; # log prefix
52              
53             my $json = JSON::MaybeXS->new->allow_nonref;
54              
55             # this is used for testing purposes only (e.g. to simulate crash)
56             our %_hooks;
57             our %_settings = (
58             default_rollback_on_action_failure => 1,
59             );
60              
61             # note: to avoid confusion, whenever we mention 'transaction' (or tx for short)
62             # in the code, we must always specify whether it is a sqlite tx (sqltx) or a
63             # Rinci tx (Rtx).
64              
65             # note: no method should die(), we should return error response instead. this is
66             # historical (we are called by Perinci::Access::Schemeless and in turn it is
67             # called by Perinci::Access::HTTP::Server, they used to have no wrapper eval(),
68             # but that turns out to be rather unsafe). an exception to this is in _init(),
69             # when we don't want to deal with old data and just die.
70              
71             # note: we have not dealt with sqlite's rowid wraparound. since it's a 64-bit
72             # integer, we're pretty safe. we also usually rely on ctime first for sorting.
73              
74             # new() should return object on success, or an error string if failed (fatal
75             # error). the other methods (internal or external) returns enveloped result.
76             sub new {
77 33     33 1 3481 my ($class, %opts) = @_;
78 33 50       333 return "Please supply pa object" unless blessed $opts{pa};
79             return "pa object must be an instance of Perinci::Access::Schemeless"
80 33 50       329 unless $opts{pa}->isa("Perinci::Access::Schemeless");
81              
82 33         218 my $obj = bless \%opts, $class;
83 33 50       171 if ($opts{data_dir}) {
84 33 100       1065 unless (-d $opts{data_dir}) {
85 4 50       368 mkdir $opts{data_dir} or return "Can't mkdir $opts{data_dir}: $!";
86             }
87             } else {
88 0         0 for ("$ENV{HOME}/.perinci", "$ENV{HOME}/.perinci/.tx") {
89 0 0       0 unless (-d $_) {
90 0 0       0 mkdir $_ or return "Can't mkdir $_: $!";
91             }
92             }
93 0         0 $opts{data_dir} = "$ENV{HOME}/.perinci/.tx";
94             }
95 33         392 my $res = $obj->_init;
96 33 50       237 return $res->[1] unless $res->[0] == 200;
97 33         296 $obj;
98             }
99              
100             sub _lock_db {
101 325     325   2171 my ($self, $shared) = @_;
102              
103 325         2065 eval {
104 325 100       2220 unless ($self->{_lock}) {
105 124         2947 $self->{_lock} = File::Flock::Retry->lock(
106             "$self->{_db_file}.lck", {retries=>5, shared=>1});
107             }
108             };
109 325 50       117430 return [532, "Tx database is still locked by other process ".
110             "(probably recovery) after 5 seconds, giving up: $@"]
111             if $@;
112 325         2186 [200];
113             }
114              
115             sub _unlock_db {
116 91     91   407 my ($self) = @_;
117              
118 91         1001 undef $self->{_lock};
119 91         48612 [200];
120             }
121              
122             sub _init {
123 33     33   148 my ($self) = @_;
124 33         187 my $data_dir = $self->{data_dir};
125 33         319 log_trace("$lp Initializing data dir %s ...", $data_dir);
126              
127 33 100       1709 unless (-d "$self->{data_dir}/.trash") {
128 4 50       193 mkdir "$self->{data_dir}/.trash"
129             or return [532, "Can't create .trash dir: $!"];
130             }
131 33 100       695 unless (-d "$self->{data_dir}/.tmp") {
132 4 50       189 mkdir "$self->{data_dir}/.tmp"
133             or return [532, "Can't create .tmp dir: $!"];
134             }
135              
136 33         369 $self->{_db_file} = "$data_dir/tx.db";
137              
138 33 50       535 (-d $data_dir)
139             or return [532, "Transaction data dir ($data_dir) doesn't exist ".
140             "or not a dir"];
141 33 50       676 my $dbh = DBI->connect("dbi:SQLite:dbname=$self->{_db_file}", undef, undef,
142             {
143             RaiseError => 0,
144             #sqlite_use_immediate_transaction => 1
145             })
146             or return [532, "Can't connect to transaction DB: $DBI::errstr"];
147              
148             # init database
149              
150 33         94527 local $ep = "Can't init tx db:"; # error prefix
151              
152 33 50       420 $dbh->do(<<_) or return [532, "$ep create tx: ". $dbh->errstr];
153             CREATE TABLE IF NOT EXISTS tx (
154             ser_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
155             str_id VARCHAR(200) NOT NULL,
156             owner_id VARCHAR(64) NOT NULL,
157             summary TEXT,
158             status CHAR(1) NOT NULL, -- i, a, C, U, R, u, v, d, e, X [uppercase=final]
159             ctime REAL NOT NULL,
160             commit_time REAL,
161             last_action_id INTEGER,
162             UNIQUE (str_id)
163             )
164             _
165              
166             # for tx with status=i, last_action_id is the in-progress action ID, set
167             # when in the middle of processing actions, then unset again after action
168             # has finished. during recovery, if tx with status=i still has this field
169             # set, it means it has crashed in the middle of action.
170             #
171             # for tx with other transient status (a, u/v, d/e) this field is used to
172             # mark which action has been processed. rollback/roll forward will start
173             # from this action instead of having to start from the first action of
174             # transaction.
175              
176 33 50       557326 $dbh->do(<<_) or return [532, "$ep create do_action: ". $dbh->errstr];
177             CREATE TABLE IF NOT EXISTS do_action (
178             tx_ser_id INTEGER NOT NULL, -- refers tx(ser_id)
179             id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
180             sp TEXT, -- for named savepoint
181             ctime REAL NOT NULL,
182             f TEXT NOT NULL,
183             args TEXT NOT NULL,
184             UNIQUE(sp)
185             )
186             _
187              
188 33 50       112420 $dbh->do(<<_) or return [532, "$ep create undo_action: ". $dbh->errstr];
189             CREATE TABLE IF NOT EXISTS undo_action (
190             tx_ser_id INTEGER NOT NULL, -- refers tx(ser_id)
191             action_id INTEGER NOT NULL, -- refers do_action(id)
192             id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
193             ctime REAL NOT NULL,
194             f TEXT NOT NULL,
195             args TEXT NOT NULL
196             )
197             _
198              
199 33 50       130964 $dbh->do(<<_) or return [532, "$ep create _meta: ".$dbh->errstr];
200             CREATE TABLE IF NOT EXISTS _meta (
201             name TEXT PRIMARY KEY NOT NULL,
202             value TEXT
203             )
204             _
205 33 50       87901 $dbh->do(<<_) or return [532, "$ep insert v: ".$dbh->errstr];
206             -- v is incremented everytime schema changes
207             INSERT OR IGNORE INTO _meta VALUES ('v', '5')
208             _
209              
210             # deal with table structure changes
211             UPDATE_SCHEMA:
212 33         78799 while (1) {
213 33         733 my ($v) = $dbh->selectrow_array(
214             "SELECT value FROM _meta WHERE name='v'");
215 33 50       8381 if ($v <= 3) {
    50          
216              
217             # changes incompatible (no longer undo_step and redo_step tables),
218             # can lose data. we bail and let user decide for herself.
219              
220 0         0 die join(
221             "",
222             "Your transaction database ($self->{_db_file}) is still at v=3",
223             ", there is incompatible changes with newer version. ",
224             "Either delete the transaction database (and lose undo data) ",
225             "or use an older version of ".__PACKAGE__." (0.28 or older).\n",
226             );
227              
228             } elsif ($v == 4) {
229              
230 0         0 eval {
231 0         0 local $dbh->{RaiseError} = 1;
232 0         0 $dbh->begin_work;
233              
234             # rename field: last_call_id -> last_action_id
235 0         0 $dbh->do("ALTER TABLE tx RENAME TO tmp_tx");
236 0         0 $dbh->do(<<'_');
237             CREATE TABLE IF NOT EXISTS tx (
238             ser_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
239             str_id VARCHAR(200) NOT NULL,
240             owner_id VARCHAR(64) NOT NULL,
241             summary TEXT,
242             status CHAR(1) NOT NULL, -- i, a, C, U, R, u, v, d, e, X [uppercase=final]
243             ctime REAL NOT NULL,
244             commit_time REAL,
245             last_action_id INTEGER,
246             UNIQUE (str_id)
247             )
248             _
249 0         0 $dbh->do(<<'_');
250             INSERT INTO tx (ser_id,str_id,owner_id,summary,status,ctime,commit_time,last_action_id)
251             SELECT ser_id,str_id,owner_id,summary,status,ctime,commit_time,last_call_id FROM tmp_tx
252             _
253              
254 0         0 $dbh->do("DROP TABLE tmp_tx");
255 0         0 $dbh->do("DROP TABLE call");
256 0         0 $dbh->do("DROP TABLE undo_call");
257 0         0 $dbh->do("UPDATE _meta SET value='5' WHERE name='v'");
258             # delete column sp, not yet
259 0         0 $dbh->commit;
260             };
261 0         0 my $e = $@;
262 0 0       0 do { $dbh->rollback; die $e } if $e;
  0         0  
  0         0  
263              
264             } else {
265             # already the latest schema version
266 33         244 last UPDATE_SCHEMA;
267             }
268             }
269              
270 33         167 $self->{_dbh} = $dbh;
271 33         316 log_trace("$lp Data dir initialization finished");
272 33         507 $self->_recover;
273             }
274              
275             sub get_trash_dir {
276 0     0 1 0 my ($self) = @_;
277 0         0 my $tx = $self->{_cur_tx};
278 0 0       0 return [412, "No current transaction, won't create trash dir"] unless $tx;
279 0         0 my $d = "$self->{data_dir}/.trash/$tx->{ser_id}";
280 0 0       0 unless (-d $d) {
281 0 0       0 mkdir $d or return [532, "Can't mkdir $d: $!"];
282             }
283 0         0 [200, "OK", $d];
284             }
285              
286             sub get_tmp_dir {
287 0     0 1 0 my ($self) = @_;
288 0         0 my $tx = $self->{_cur_tx};
289 0 0       0 return [412, "No current transaction, won't create tmp dir"] unless $tx;
290 0         0 my $d = "$self->{data_dir}/.tmp/$tx->{ser_id}";
291 0 0       0 unless (-d $d) {
292 0 0       0 mkdir $d or return [532, "Can't mkdir $d: $!"];
293             }
294 0         0 [200, "OK", $d];
295             }
296              
297             sub get_func_and_meta {
298 666     666 0 3523 my ($self, $func) = @_;
299              
300 666 50       13435 my ($module, $leaf) = $func =~ /(.+)::(.+)/
301             or return [400, "Not a valid fully qualified function name: $func"];
302 666         2660 my $module_p = $module; $module_p =~ s!::!/!g; $module_p .= ".pm";
  666         2947  
  666         2199  
303 666         1997 eval { require $module_p };
  666         13516  
304 666         6045 my $req_err = $@;
305 666 50       7535 if ($req_err) {
    50          
306 0 0       0 if (!package_exists($module)) {
    0          
307 0         0 return [532, "Can't load module $module (probably ".
308             "mistyped or missing module): $req_err"];
309             } elsif ($req_err !~ m!Can't locate!) {
310 0         0 return [532, "Can't load module $module (probably ".
311             "compile error): $req_err"];
312             }
313             # require error of "Can't locate ..." can be ignored. it
314             # might mean package is already defined by other code. we'll
315             # try and access it anyway.
316             } elsif (!package_exists($module)) {
317             # shouldn't happen
318 0         0 return [532, "Module loaded OK, but no $module package ".
319             "found, something's wrong"];
320             }
321             # get metadata as well as wrapped
322 666         29954 my $res = $self->{pa}->request(get_code_and_meta => "/$module/$leaf");
323 666         17267 $res;
324             }
325              
326             # about _in_sqltx: DBI/DBD::SQLite currently does not support checking whether
327             # we are in an active sqltx, except $dbh->{BegunWork} which is undocumented. we
328             # use our own flag here.
329              
330             # just a wrapper to avoid error when rollback with no active tx
331             sub _rollback_dbh {
332 185     185   1245 my $self = shift;
333 185 100       7595 $self->{_dbh}->rollback if $self->{_in_sqltx};
334 185         935 $self->{_in_sqltx} = 0;
335 185         1406 [200];
336             }
337              
338             # just a wrapper to avoid error when committing with no active tx
339             sub _commit_dbh {
340 184     184   1048 my $self = shift;
341 184 100       1593 return [200] unless $self->{_in_sqltx};
342 106         2308523 my $res = $self->{_dbh}->commit;
343 106         1893 $self->{_in_sqltx} = 0;
344 106 50       1750 $res ? [200] : [532, "db: Can't commit: ".$self->{_dbh}->errstr];
345             }
346              
347             # just a wrapper to avoid error when beginning twice
348             sub _begin_dbh {
349 234     234   1144 my $self = shift;
350 234 100       1626 return [200] if $self->{_in_sqltx};
351 232         3731 my $res = $self->{_dbh}->begin_work;
352 232         19222 $self->{_in_sqltx} = 1;
353 232 50       2143 $res ? [200] : [532, "db: Can't begin: ".$self->{_dbh}->errstr];
354             }
355              
356             sub _test_tx_support {
357 666     666   3146 my ($self, $meta) = @_;
358 666   100     5528 my $ff = $meta->{features} // {};
359             $ff->{tx} or
360 666 100       3236 return [412, "function does not support transaction"];
361 665 50 50     5825 ($ff->{tx}{v} // 1) == $proto_v
362             or return [412, "function does not support correct transaction ".
363             "protocol version (v=$proto_v needed)"];
364             $ff->{idempotent} or
365 665 50       2892 return [412, "function does not declare idempotent feature"];
366 665         3811 [200];
367             }
368              
369             # check actions. actions should be [[f,args,JSON(args),cid?,\&code?,$meta?],
370             # ...]. this function will check whether function name is valid, whether
371             # arguments can be deserialized, etc. modify actions in-place (e.g. qualify
372             # function names if $opts->{qualify} is set, decode/encode JSON for arguments,
373             # cache function in [4], cache meta in [5]).
374             sub _check_actions {
375 466     466   3394 my ($self, $actions, $opts) = @_;
376 466   100     4431 $opts //= {};
377 466 50       3244 return [532, "BUG: argument 'actions' not an array"]
378             unless ref($actions) eq 'ARRAY';
379 466         1612 my $i = 0;
380 466         2187 for my $a (@$actions) {
381 667         2291 $i++;
382 667         3934 local $ep = "action #$i ($a->[0]): invalid action";
383 667 50       3940 return [532, "$ep: not an array"] unless ref($a) eq 'ARRAY';
384             $a->[0] = "$opts->{qualify}::$a->[0]"
385 667 100 66     6994 if $opts->{qualify} && $a->[0] !~ /::/;
386 667 50       10242 return [532, "$ep: invalid function name"]
387             unless $a->[0] =~ /\A\w+(::\w+)+\z/;
388 667         2383 eval {
389 667 100       3987 if ($a->[2]) {
    50          
390 241         4818 $a->[1] = $json->decode($a->[2]);
391             } elsif ($a->[1]) {
392 426         7705 $a->[2] = $json->encode($a->[1]);
393             }
394             };
395 667 100       3800 return [532, "$ep: can't decode/encode JSON arguments: $@"] if $@;
396 666         3944 my $res = $self->get_func_and_meta($a->[0]);
397 666 50       13260 return err(532, "$ep: can't get metadata", $res)
398             unless $res->[0] == 200;
399 666         2158 my ($func, $meta) = @{$res->[2]};
  666         3147  
400 666         4505 $res = $self->_test_tx_support($meta);
401 666 100       4507 return err(532, "$ep: function does not pass tx support test", $res)
402             unless $res->[0] == 200;
403 665         4152 $a->[4] = $func;
404 665         12515 $a->[5] = $meta;
405             }
406 464         2679 [200];
407             }
408              
409             sub _set_tx_status_before_or_after_actions {
410 287     287   2035 my ($self, $which0, $whicha) = @_;
411              
412 287         1520 my $dbh = $self->{_dbh};
413 287         1606 my $tx = $self->{_cur_tx};
414              
415 287         3187 my $os = $tx->{status};
416 287         999 my $ns; # temporary new status during processing
417             my $fs; # desired final status
418 287 100       3067 if ($whicha eq 'action') {
    100          
    100          
    50          
419             # no change is expected
420 132         408 $ns = $os;
421 132         2410 $fs = $os;
422             } elsif ($whicha eq 'rollback') {
423 52 100       469 $ns = $os eq 'i' ? 'a' : $os eq 'u' ? 'v' : $os eq 'd' ? 'e' : $os;
    100          
    100          
424 52 100 100     879 $fs = $os eq 'u'||$ns eq 'v' ? 'C' : $os eq 'd'||$ns eq 'e' ? 'U' : 'R';
    100 100        
425             } elsif ($whicha eq 'undo') {
426 66         233 $ns = 'u';
427 66         374 $fs = 'U';
428             } elsif ($whicha eq 'redo') {
429 37         166 $ns = 'd';
430 37         129 $fs = 'C';
431             }
432              
433 287 100       1857 if ($which0 eq 'before') {
434 171 100       1794 if ($ns ne $os) {
435 85         1440 log_trace("$lp Setting transient transaction status ".
436             "%s -> %s ...", $os, $ns);
437             $dbh->do("UPDATE tx SET status='$ns', last_action_id=NULL ".
438             "WHERE ser_id=?", {}, $tx->{ser_id})
439 85 50       1681 or return [532, "db: Can't update tx status $os -> $ns: ".
440             $dbh->errstr];
441             # to make sure, check once again if Rtx status is indeed updated
442             my @r = $dbh->selectrow_array(
443 85         1775548 "SELECT status FROM tx WHERE ser_id=?", {}, $tx->{ser_id});
444 85 50       32707 return [532, "Can't update tx status #3 ".
445             "(tx doesn't exist in db)"] unless @r;
446 85 50       673 return [532, "Can't update tx status #2 ".
447             "(wants $ns, still $r[0])"]
448             unless $r[0] eq $ns;
449             # update row cache
450 85         610 $tx->{status} = $ns; $tx->{last_action_id} = undef;
  85         872  
451             }
452             }
453 287         1403 $os = $ns;
454              
455 287 100       2445 if ($which0 eq 'after') {
456 116 100       3160 if ($whicha eq 'action') {
457             # reset last_action_id to mark that we are finished
458             $dbh->do("UPDATE tx SET last_action_id=NULL ".
459             "WHERE ser_id=?", {}, $tx->{ser_id})
460 55 50       1238 or return [532, "db: Can't update last_action_id->NULL: ".
461             $dbh->errstr];
462             }
463              
464 116 100       837019 if ($os ne $fs) {
465 61         1015 log_trace("$lp Setting final transaction status %s -> %s ...",
466             $ns, $fs);
467             $dbh->do("UPDATE tx SET status='$fs',last_action_id=NULL ".
468             "WHERE ser_id=?",
469             {}, $tx->{ser_id})
470 61 50       1411 or return [532, "db: Can't set tx status to $fs: ".
471             $dbh->errstr];
472             # update row cache
473 61         1426624 $tx->{status} = $fs; $tx->{last_action_id} = undef;
  61         738  
474             }
475             }
476              
477 287         3188 [200];
478             }
479              
480             sub _set_tx_status_before_actions {
481 171     171   764 my $self = shift;
482 171         1303 $self->_set_tx_status_before_or_after_actions('before', @_);
483             }
484              
485             sub _set_tx_status_after_actions {
486 116     116   499 my $self = shift;
487 116         1158 $self->_set_tx_status_before_or_after_actions('after', @_);
488             }
489              
490             # return enveloped actions (arrayref)
491             sub _get_actions_from_db {
492 94     94   467 my ($self, $which) = @_;
493              
494             # for safety, we shouldn't call this function when which='action' anyway
495 94 50       944 return [200, "OK", []] if $which eq 'action';
496              
497 94         902 my $dbh = $self->{_dbh};
498 94         377 my $tx = $self->{_cur_tx};
499              
500 94 100 100     1776 my $t = $which eq 'redo' || $which eq 'rollback' && $tx->{status} eq 'v' ?
501             'do_action' : 'undo_action';
502              
503 94         381 my $lai = $tx->{last_action_id};
504             my $actions = $dbh->selectall_arrayref(
505             "SELECT f, NULL, args, id FROM $t WHERE tx_ser_id=? ".
506             ($lai ? "AND (id<>$lai AND ".
507             "ctime <= (SELECT ctime FROM $t WHERE id=$lai)) " : "").
508 94 100       2700 "ORDER BY ctime, id", {}, $tx->{ser_id});
509 94         35048 [200, "OK", [reverse @$actions]];
510             }
511              
512             # return enveloped undo actions (arrayref), this is currently used for debugging
513             sub _get_undo_actions_from_db {
514 0     0   0 my ($self, $which) = @_;
515              
516             # rollback does not record undo actions in db
517 0 0       0 return [200, "OK", []] if $which eq 'rollback';
518              
519 0         0 my $dbh = $self->{_dbh};
520 0         0 my $tx = $self->{_cur_tx};
521             my $t = $which eq 'redo' || $which eq 'rollback' && $tx->{status} eq 'v' ||
522             # we can also invoke actions during undo
523             ($which eq 'action' && !$self->{_in_undo})
524 0 0 0     0 ? 'undo_action' : 'do_action';
525              
526             my $actions = $dbh->selectall_arrayref(
527             "SELECT f, NULL, args, id FROM $t WHERE tx_ser_id=? ".
528 0         0 "ORDER BY ctime, id", {}, $tx->{ser_id});
529 0         0 [200, "OK", [reverse @$actions]];
530             }
531              
532             sub _collect_stash {
533 506     506   3366 my ($self, $res) = @_;
534 506         3053 my $s = $res->[3]{stash};
535 506 50       4596 return [200] unless ref($s) eq 'HASH';
536 0         0 $self->{_stash}{$_} = $s->{$_} for keys %$s;
537 0         0 [200];
538             }
539              
540             sub _perform_action {
541 275     275   1427 my ($self, $which, $action, $opts) = @_;
542 275         1571 my $res;
543              
544 275         1138 my $dbh = $self->{_dbh};
545 275         1131 my $tx = $self->{_cur_tx};
546              
547 275         1393 my %args = %{$action->[1]};
  275         3460  
548 275         2319 $args{-tx_v} = $proto_v;
549 275 100       2862 $args{-tx_rollback} = 1 if $which eq 'rollback';
550 275 100       1762 $args{-tx_recovery} = 1 if $self->{_in_recovery};
551 275 100       1955 $args{-confirm} = 1 if $opts->{confirm};
552 275   50     4068 my $dd = $action->[5]{deps} // {};
553 275 50       2131 if ($dd->{tmp_dir}) { # XXX actually need to use dep_satisfy_rel
554 0         0 $res = $self->get_tmp_dir;
555 0 0       0 return err(412, "Can't get tmp dir", $res) unless $res->[0] == 200;
556 0         0 $args{-tmp_dir} = $res->[2];
557             }
558 275 50       1283 if ($dd->{trash_dir}) { # XXX actually need to use dep_satisfy_rel
559 0         0 $res = $self->get_trash_dir;
560 0 0       0 return err($res, "Can't get trash dir", $res) unless $res->[0] == 200;
561 0         0 $args{-trash_dir} = $res->[2];
562             }
563 275         2871 $args{-stash} = $self->{_stash};
564              
565             # call the first time, to get undo actions
566              
567 275         1664 $args{-tx_action} = 'check_state';
568 275         2604 $args{-tx_action_id} = UUID::Random::generate();
569 275         50404 $self->{_res} = $res = $action->[4]->(%args);
570 275         92921 log_trace("$lp check_state args: %s, result: %s", \%args, $res);
571 275 100 100     3635 return err(532, "$ep: Check state failed", $res)
572             unless $res->[0] == 200 || $res->[0] == 304;
573 272 50 66     5644 log_debug($res->[1]) if $res->[0] == 200 && $res->[1];
574 272   100     2621 my $undo_actions = $res->[3]{undo_actions} // [];
575 272         1135 my $do_actions = $res->[3]{do_actions};
576 272         1882 $self->_collect_stash($res);
577              
578 272         1619 for ('after_check_state') {
579 272 50       1977 last unless $_hooks{$_};
580 0         0 log_trace("$lp hook: $_");
581 0         0 $_hooks{$_}->($self, which=>$which, action=>$action, res=>$res);
582             }
583              
584 272         1164 my $pkg = $action->[0]; $pkg =~ s/::\w+\z//;
  272         2532  
585 272         3402 $res = $self->_check_actions($undo_actions, {qualify=>$pkg});
586 272 50       2222 return $res unless $res->[0] == 200;
587              
588 272 100       2332 if ($do_actions) {
589 23         168 $res = $self->_check_actions($do_actions, {qualify=>$pkg});
590 23 50       250 return $res unless $res->[0] == 200;
591             }
592              
593             # record action
594              
595 272 50 66     5273 if ($which eq 'action' && !$self->{_in_undo} && !$self->{_in_redo}) {
      66        
596 115         457 my $t = 'do_action';
597             $dbh->do("INSERT INTO $t (tx_ser_id,ctime,f,args) ".
598             "VALUES (?,?,?,?)", {},
599 115 50       3334 $tx->{ser_id}, time(), $action->[0], $action->[2])
600             or return [532, "$ep: db: can't insert $t: ".$dbh->errstr];
601 115         2243807 my $action_id = $dbh->last_insert_id("","","","");
602             $dbh->do("UPDATE tx SET last_action_id=? WHERE ser_id=?", {},
603             $action_id, $tx->{ser_id})
604 115 50       2620 or return [532, "$ep: db: can't set last_action_id: ".$dbh->errstr];
605 115         1661911 $action->[3] = $action_id;
606             }
607              
608             # record undo actions. rollback doesn't need to do this, failure in rollback
609             # will result in us giving up anyway.
610              
611 272 100 100     5259 unless ($which eq 'rollback' || $do_actions) {
612             # no BEGIN + COMMIT is needed here, because actions have not been
613             # performed. all these undo actions should return 304 anyway if
614             # performed during rollback
615 207         1619 my $j = 0;
616 207         1587 for my $ua (@$undo_actions) {
617 192         1990 local $ep = "$ep undo_actions[$j] ($ua->[0])";
618 192 100       2105 if ($self->{_in_undo}) {
619             $dbh->do(
620             "INSERT INTO do_action (tx_ser_id,ctime,f,args) ".
621             "VALUES (?,?,?,?)", {},
622 68 50       2128 $tx->{ser_id}, time(), $ua->[0], $ua->[2])
623             or return [532, "$ep: db: can't insert undo_action: ".
624             $dbh->errstr];
625             } else {
626             $dbh->do(
627             "INSERT INTO undo_action(tx_ser_id,action_id,ctime,f,args)".
628             "VALUES (?,?,?,?,?)", {},
629 124 50       3673 $tx->{ser_id}, $action->[3], time(), $ua->[0], $ua->[2])
630             or return [532, "$ep: db: can't insert do_action: ".
631             $dbh->errstr];
632             }
633 192         3583848 $j++;
634             }
635             }
636              
637             # call function "for real" this time
638              
639 272 100 66     4793 if ($do_actions && @$do_actions) {
    100          
640              
641 23         137 for ('before_inner_action') {
642 23 50       177 last unless $_hooks{$_};
643 0         0 log_trace("$lp hook: $_");
644 0         0 $_hooks{$_}->($self, which=>$which, actions=>$do_actions);
645             }
646              
647 23         712 $res = $self->_action($do_actions, $opts);
648 23 100       1778 return $res unless $res->[0] == 200;
649              
650 16         78 for ('after_inner_action') {
651 16 50       135 last unless $_hooks{$_};
652 0         0 log_trace("$lp hook: $_");
653 0         0 $_hooks{$_}->($self, which=>$which,actions=>$do_actions,res=>$res);
654             }
655              
656             } elsif ($self->{_res}[0] == 200) {
657 234         2794 $args{-tx_action} = 'fix_state';
658 234         20732 $self->{_res} = $res = $action->[4]->(%args);
659 234         68686 log_trace("$lp fix_state args: %s, result: %s", \%args, $res);
660 234 50 33     3459 return [532, "$ep: action failed", $res]
661             unless $res->[0] == 200 || $res->[0] == 304;
662 234         2243 $self->_collect_stash($res);
663             }
664              
665 265         1843 for ('after_fix_state') {
666 265 100       2028 last unless $_hooks{$_};
667 126         1119 log_trace("$lp hook: $_");
668 126         1718 $_hooks{$_}->($self, which=>$which, action=>$action, res=>$res);
669             }
670              
671             # update last_action_id so we don't have to repeat all steps
672             # after recovery. error can be ignored here, i think.
673              
674 222 100       1702 unless ($which eq 'action') {
675             $dbh->do("UPDATE tx SET last_action_id=? WHERE ser_id=?", {},
676 124         3093 $action->[3], $tx->{ser_id});
677             }
678              
679 222         2298551 [200];
680             }
681              
682             # rollback, undo, redo, action are all action loops. we combine them here into a
683             # common routine.
684             sub _action_loop {
685             # $actions is only for which='action'. for rollback/undo/redo, $actions is
686             # taken from the database table.
687 171     171   1217 my ($self, $which, $actions, $opts) = @_;
688 171   100     1336 $opts //= {};
689 171   100     2206 $opts->{rollback} //= $_settings{default_rollback_on_action_failure};
690              
691 171         912 my $res;
692              
693 171 100 100     4490 local $self->{_action_nest_level} = ($self->{_action_nest_level}//0) + 1
694             if $which eq 'action';
695              
696             local $lp = "[tm] [".
697             "$which".
698 171 100       1742 ($self->{_action_nest_level} ? "($self->{_action_nest_level})":"").
699             "]";
700              
701 171 50       2094 return [532, "BUG: 'which' must be rollback/undo/redo/action"]
702             unless $which =~ /\A(rollback|undo|redo|action)\z/;
703              
704             # this prevent endless loop in rollback, since we call functions when doing
705             # rollback, and functions might call $tx->rollback too upon failure.
706 171 50 33     1442 return if $self->{_in_rollback} && $which eq 'rollback';
707 171 100       1054 local $self->{_in_rollback} = 1 if $which eq 'rollback';
708              
709 171 100       923 local $self->{_in_undo} = 1 if $which eq 'undo';
710 171 100       1380 local $self->{_in_redo} = 1 if $which eq 'redo';
711              
712 171         687 my $tx = $self->{_cur_tx};
713 171 50       1027 return [532, "called w/o Rinci transaction, probably a bug"] unless $tx;
714              
715 171         621 my $dbh = $self->{_dbh};
716 171         1297 $self->_rollback_dbh;
717             # we're now in sqlite autocommit mode, we use this mode for the following
718             # reasons: 1) after we set Rtx status to a/e/v/u/d, we need other clients to
719             # immediately see this, so e.g. if Rtx was i, they do not try to add steps
720             # to it. also, when performing actions, we want to update+commit after each
721             # action.
722              
723             # first we need to set the appropriate transaction status first, to prevent
724             # other clients from interfering/racing.
725 171         1205 $res = $self->_set_tx_status_before_actions($which);
726 171 50       1191 return $res unless $res->[0] == 200;
727              
728 171         1516 $self->{_stash} = {};
729              
730             # for the main processing, we setup a giant eval loop. any error during
731             # processing, we return() from the eval and trigger a rollback (unless we
732             # are the rollback process itself, in which case we set tx status to X and
733             # give up).
734 171         943 my $eval_res = eval {
735 171 100       1630 $actions = $self->_get_actions_from_db($which)->[2] unless $actions;
736             log_trace("$lp Actions to perform: %s",
737 171   66     2992 [map {[$_->[0], $_->[2] // $_->[1]]} @$actions]);
  295         6634  
738              
739             # check the actions
740 171         3971 $res = $self->_check_actions($actions);
741 171 100       1510 return $res unless $res->[0] == 200;
742              
743 169         764 my $i = 0;
744 169         1379 for my $action (@$actions) {
745 275         1304 $i++;
746 275         3902 local $lp = "$lp [action #$i/".scalar(@$actions)." ($action->[0])]";
747 275         4169 local $ep = "action #$i/".scalar(@$actions)." ($action->[0])";
748 275         2094 $res = $self->_perform_action($which, $action, $opts);
749 232 100       4195 return $res unless $res->[0] == 200;
750             }
751              
752 116         1102 $res = $self->_set_tx_status_after_actions($which);
753 116 50       1270 return $res unless $res->[0] == 200;
754              
755 116         1111 [200];
756             }; # eval
757 171         1210 my $eval_err = $@;
758              
759 171 100 100     2482 if ($eval_err || $eval_res->[0] != 200) {
760 55 100 100     931 if ($which eq 'rollback') {
    100 100        
761             # if failed during rolling back, we don't know what else to do. we
762             # set Rtx status to X (inconsistent) and ignore it.
763             $dbh->do("UPDATE tx SET status='X' WHERE ser_id=?",
764 12         7547 {}, $tx->{ser_id});
765 12 50       204665 return $eval_err ?
766             err(532, "died during rollback: $eval_err") :
767             err(532, "error during rollback", $eval_res);
768             } elsif (!$opts->{rollback} || ($self->{_action_nest_level}//0) > 1) {
769             # do not rollback nested action or if told not to rollback
770 16 50       242 return $eval_err ?
771             err(532, "died during nested action (no rollback): $eval_err") :
772             err(532, "error during nested action (no rollback)", $eval_res);
773             } else {
774 27         614 my $rbres = $self->_rollback;
775 27 100       197 if ($rbres->[0] != 200) {
776 12         59 $rbres->[3]{prev} = $eval_res;
777 12 100       131 return $eval_err ?
778             err(532, $eval_err." (rollback failed)", $rbres) :
779             err(532, "$eval_res->[0] - $eval_res->[1] ".
780             "(rollback failed)", $rbres);
781             } else {
782 15 100       408 return $eval_err ?
783             err(532, $eval_err." (rolled back)", $eval_res) :
784             err(532, "$eval_res->[0] - $eval_res->[1] (rolled back)",
785             $eval_res);
786             }
787             }
788             }
789              
790 116 50       1840 if (log_is_trace) {
791 0         0 my $undo_actions = $self->_get_undo_actions_from_db($which)->[2];
792             log_trace("$lp Recorded undo actions: %s",
793 0 0       0 [map {[$_->[0], $_->[2]]} @$undo_actions])
  0         0  
794             if $undo_actions;
795             }
796              
797 116         3180 [200];
798             }
799              
800             sub _cleanup {
801 58     58   281 my ($self, $which) = @_;
802 58         781 log_trace("$lp Performing cleanup ...");
803              
804             # there should be only one process running
805 58         2094 my $res = $self->_lock_db(undef);
806 58 50       415 return $res unless $res->[0] == 200;
807              
808 58         1216 my $data_dir = $self->{data_dir};
809 58         214 my $dbh = $self->{_dbh};
810              
811 58         302 for my $subd (".trash", ".tmp") {
812 116         608 my $dir = "$data_dir/$subd";
813 116 50       3416 (-d $dir) or next;
814 116         9644 opendir my($dh), $dir;
815 116         1726 my @dirs = grep {/^\d+$/} readdir($dh);
  232         1492  
816 116         1372 closedir $dh;
817 256         50452 my @tx_ids = map {$_->[0]}
818 116   50     493 @{ $dbh->selectall_arrayref("SELECT ser_id FROM tx") // []};
  116         5750  
819 116         2737 for my $tx_id (@dirs) {
820 0 0       0 next if $tx_id ~~ @tx_ids;
821 0         0 log_trace("Deleting %s ...", "$dir/$tx_id");
822 0         0 remove "$dir/$tx_id";
823             }
824             }
825              
826 58         595 $self->discard_all(status=>['R','X']);
827              
828             # XXX also discard all C/U Rtxs that are too old
829              
830             # XXX also rolls back all i Rtxs that have been going around too for
831             # long
832              
833 58         1727 log_trace("$lp Finished cleanup");
834 58         617 $self->_unlock_db;
835              
836 58         646 [200];
837             }
838              
839             sub _recover {
840 33     33   152 my ($self, $which) = @_;
841              
842 33         230 log_trace("$lp Performing recovery ...");
843 33         248 local $self->{_in_recovery} = 1;
844              
845             # there should be only one process running
846 33         274 my $res = $self->_lock_db(undef);
847 33 50       220 return $res unless $res->[0] == 200;
848              
849 33         1070 my $dbh = $self->{_dbh};
850 33         97 my $sth;
851              
852             # rollback all transactions that need to be rolled back (crashed
853             # in-progress, failed undo, failed redo)
854 33         3388 $sth = $dbh->prepare(
855             "SELECT * FROM tx WHERE status IN ('a', 'v', 'e') ".
856             "OR (status='i' AND last_action_id IS NOT NULL)".
857             "ORDER BY ctime DESC",
858             );
859 33 50       18595 $sth->execute or return [532, "db: Can't select tx: ".$dbh->errstr];
860 33         1423 while (my $row = $sth->fetchrow_hashref) {
861 0         0 $self->{_cur_tx} = $row;
862 0         0 $self->_rollback;
863             }
864              
865             # continue interrupted undo
866 33         566 $sth = $dbh->prepare(
867             "SELECT * FROM tx WHERE status IN ('u') ".
868             "ORDER BY ctime DESC",
869             );
870 33 50       11433 $sth->execute or return [532, "db: Can't select tx: ".$dbh->errstr];
871 33         1057 while (my $row = $sth->fetchrow_hashref) {
872 4         25 $self->{_cur_tx} = $row;
873 4         28 $self->_undo;
874             }
875              
876             # continue interrupted redo
877 33         407 $sth = $dbh->prepare(
878             "SELECT * FROM tx WHERE status IN ('d') ".
879             "ORDER BY ctime ASC",
880             );
881 33 50       7143 $sth->execute or return [532, "db: Can't select tx: ".$dbh->errstr];
882 33         1148 while (my $row = $sth->fetchrow_hashref) {
883 5         36 $self->{_cur_tx} = $row;
884 5         98 $self->_redo;
885             }
886              
887             EXIT_RECOVERY:
888 33         396 $self->_unlock_db;
889 33         393 log_trace("$lp Finished recovery");
890 33         1449 [200];
891             }
892              
893             sub _resp_incorrect_tx_status {
894 2     2   9 my ($self, $r) = @_;
895              
896 2         15 state $statuses = {
897             i => 'still in-progress',
898             a => 'aborted, further requests ignored until rolled back',
899             v => 'aborted undo, further requests ignored until rolled back',
900             e => 'aborted redo, further requests ignored until rolled back',
901             C => 'already committed',
902             R => 'already rolled back',
903             U => 'already committed+undone',
904             u => 'undoing',
905             d => 'redoing',
906             X => 'inconsistent',
907             };
908              
909 2         6 my $s = $r->{status};
910 2   50     12 my $ss = $statuses->{$s} // "unknown (bug)";
911 2         37 [480, "tx #$r->{ser_id}: Incorrect status, status is '$s' ($ss)"];
912             }
913              
914             # all methods that work inside a transaction have some common code, e.g.
915             # database file locking, starting sqltx, checking Rtx status, etc. hence
916             # refactored into _wrap(). arguments:
917             #
918             # - label (string, just a label for logging)
919             #
920             # - args* (hashref, arguments to method)
921             #
922             # - cleanup (bool, default 0). whether to run cleanup first before code. this is
923             # curently run by begin() only, to make up room by purging old transactions.
924             #
925             # - tx_status (str/array, if set then it means method requires Rtx to exist and
926             # have a certain status(es)
927             #
928             # - code (coderef, main method code, will be passed args as hash)
929             #
930             # - rollback (bool, whether we should do rollback if code does not return
931             # success
932             #
933             # - hook_check_args (coderef, will be passed args as hash)
934             #
935             # - hook_after_commit (coderef, will be passed args as hash).
936             #
937             # wrap() will also put current Rtx record to $self->{_cur_tx}
938             #
939             # return enveloped result
940             sub _wrap {
941 234     234   2103 my ($self, %wargs) = @_;
942             my $margs = $wargs{args}
943 234 50       1715 or return [532, "BUG: args not passed to _wrap()"];
944 234         4266 my @caller = caller(1);
945              
946 234         1414 my $res;
947              
948 234         6711 $res = $self->_lock_db("shared");
949 234 50       1462 return [532, "Can't acquire lock: $res"] unless $res->[0] == 200;
950              
951 234         10229 $self->{_now} = time();
952              
953             # initialize & check tx_id argument
954 234   66     2039 $margs->{tx_id} //= $self->{_tx_id};
955 234         7318 my $tx_id = $margs->{tx_id};
956 234         990 $self->{_tx_id} = $tx_id;
957              
958 234 50 33     2574 return [400, "Please specify tx_id"]
959             unless defined($tx_id) && length($tx_id);
960 234 50       1372 return [400, "Invalid tx_id, please use 1-200 characters only"]
961             unless length($tx_id) <= 200;
962              
963 234         4619 my $dbh = $self->{_dbh};
964              
965 234 100       1289 if ($wargs{cleanup}) {
966 58         576 $res = $self->_cleanup;
967 58 50       491 return err(532, "Can't succesfully cleanup", $res)
968             unless $res->[0] == 200;
969             }
970              
971             # we need to begin sqltx here so that client's actions like rollback() and
972             # commit() are indeed atomic and do not interfere with other clients'.
973              
974 234 50       2481 $self->_begin_dbh or return [532, "db: Can't begin: ".$dbh->errstr];
975              
976 234         6592 my $cur_tx = $dbh->selectrow_hashref(
977             "SELECT * FROM tx WHERE str_id=?", {}, $tx_id);
978 234         143408 $self->{_cur_tx} = $cur_tx;
979              
980 234 50       3036 if ($wargs{hook_check_args}) {
981 0         0 $res = $wargs{hook_check_args}->(%$margs);
982 0 0       0 do { $self->_rollback; return err(532, "hook_check_args failed", $res) }
  0         0  
  0         0  
983             unless $res->[0] == 200;
984             }
985              
986 234 100       1752 if ($wargs{tx_status}) {
987 176 100       1227 if (!$cur_tx) {
988 12         115 $self->_rollback_dbh;
989 12         362 return [484, "No such transaction"];
990             }
991 164         566 my $ok;
992             # 'str' ~~ $aryref doesn't seem to work?
993 164 50       1277 if (ref($wargs{tx_status}) eq 'ARRAY') {
994 164         751 $ok = $cur_tx->{status} ~~ @{$wargs{tx_status}};
  164         8085  
995             } else {
996 0         0 $ok = $cur_tx->{status} ~~ $wargs{tx_status};
997             }
998 164 100       1758 unless ($ok) {
999 2         12 $self->_rollback_dbh;
1000 2         14 return $self->_resp_incorrect_tx_status($cur_tx);
1001             }
1002             }
1003              
1004 220 50       1682 if ($wargs{code}) {
1005 220         2268 $res = $wargs{code}->(%$margs, _tx=>$cur_tx);
1006             # on error, rollback and skip the rest
1007 220 100       5442 if ($res->[0] >= 400) {
1008             $self->_rollback if $wargs{rollback} // 1
1009 36 50 50     438 && ($res->[3]{rollback} // 1);
      66        
1010 36         986 return $res;
1011             }
1012             }
1013              
1014 184         1729 my $res2 = $self->_commit_dbh;
1015 184 50       2154 return $res2 unless $res2->[0] == 200;
1016              
1017 184 50       6133 if ($wargs{hook_after_commit}) {
1018 0         0 $res2 = $wargs{hook_after_tx}->(%$margs);
1019 0 0       0 return err(532, "hook_after_tx failed", $res2) unless $res2->[0] == 200;
1020             }
1021              
1022 184         8959 $res;
1023             }
1024              
1025             # all methods that don't work inside a transaction have some common code, e.g.
1026             # database file locking. arguments:
1027             #
1028             # - args* (hashref, arguments to method)
1029             #
1030             # - lock_db (bool, default false)
1031             #
1032             # - code* (coderef, main method code, will be passed args as hash)
1033             #
1034             # return enveloped result
1035             sub _wrap2 {
1036 141     141   1739 my ($self, %wargs) = @_;
1037             my $margs = $wargs{args}
1038 141 50       1529 or return [532, "BUG: args not passed to _wrap()"];
1039 141         2882 my @caller = caller(1);
1040              
1041 141         931 my $res;
1042              
1043 141 50       1006 if ($wargs{lock_db}) {
1044 0         0 $res = $self->_lock_db("shared");
1045 0 0       0 return err(532, "Can't acquire lock", $res) unless $res->[0] == 200;
1046             }
1047              
1048 141         1421 $res = $wargs{code}->(%$margs);
1049              
1050 141 50       2446 if ($wargs{lock_db}) {
1051 0         0 $self->_unlock_db;
1052             }
1053              
1054 141         2413 $res;
1055             }
1056              
1057             sub begin {
1058 58     58 1 279549 my ($self, %args) = @_;
1059             $self->_wrap(
1060             args => \%args,
1061             cleanup => 1,
1062             code => sub {
1063 58     58   259 my $dbh = $self->{_dbh};
1064             my $r = $dbh->selectrow_hashref("SELECT * FROM tx WHERE str_id=?",
1065 58         1155 {}, $args{tx_id});
1066 58 100       15084 return [409, "Another transaction with that ID exists", undef,
1067             {rollback=>0}] if $r;
1068              
1069             # XXX check for limits
1070              
1071             $dbh->do("INSERT INTO tx (str_id, owner_id, summary, status, ".
1072             "ctime) VALUES (?,?,?,?,?)", {},
1073             $args{tx_id}, $args{client_token}//"", $args{summary}, "i",
1074             $self->{_now})
1075 56 50 50     4121 or return [532, "db: Can't insert tx: ".$dbh->errstr];
1076              
1077 56         45728 $self->{_tx_id} = $args{tx_id};
1078             $self->{_cur_tx} = $dbh->selectrow_hashref(
1079             "SELECT * FROM tx WHERE str_id=?", {}, $args{tx_id})
1080 56 50       1031 or return [532, "db: Can't select tx: ".$dbh->errstr];
1081 56         19981 [200];
1082             },
1083 58         1087 );
1084             }
1085              
1086             sub _action {
1087 77     77   552 my ($self, $actions, $opts) = @_;
1088 77         748 $self->_action_loop('action', $actions, $opts);
1089             }
1090              
1091             # old name, for backward compatibility
1092 0     0   0 sub _call { my $self =shift; $self->_action(@_) }
  0         0  
1093 0     0 0 0 sub call { my $self =shift; $self->action(@_) }
  0         0  
1094              
1095             sub action {
1096 55     55 1 1654394 my ($self, %args) = @_;
1097              
1098 55         292 my ($f, $args, $actions);
1099 55   50     1032 $actions = $args{actions} // [[$args{f}, $args{args}]];
1100 55 50       610 return [304, "No actions to do"] unless @$actions;
1101              
1102             $self->_wrap(
1103             args => \%args,
1104             # we allow calling action() during rollback, since a function can call
1105             # other function using action(), but we don't actually bother to save
1106             # the undo actions.
1107             tx_status => ["i", "d", "u", "a", "v", "e"],
1108             rollback => 0, # _action_loop already does rollback
1109             code => sub {
1110 54     54   364 my $cur_tx = $self->{_cur_tx};
1111 54 50 33     444 if ($cur_tx->{status} ne 'i' && !$self->{_in_rollback}) {
1112 0         0 return $self->_resp_incorrect_tx_status($cur_tx);
1113             }
1114              
1115 54         376 delete $self->{_res};
1116 54         537 my $res = $self->_action($actions, {confirm=>$args{confirm}});
1117 54 100 66     2652 if ($res->[0] != 200 && $res->[0] != 304) {
1118 15 100 100     308 if ($self->{_res} && $self->{_res}[0] !~ /200|304/) {
1119             return [$self->{_res}[0],
1120 3         37 $self->{_res}[1],
1121             undef,
1122             {tx_result=>$res, prev=>$res}];
1123             } else {
1124 12         102 return err(532, {prev=>$res});
1125             }
1126             } else {
1127             return [$self->{_res}[0],
1128             $self->{_res}[1],
1129             $self->{_stash}{result},
1130 39   50     460 { %{ $self->{_stash}{result_meta} // {} },
1131 39   50     491 %{ $res->[3] // {}} }];
  39         689  
1132             }
1133             },
1134 55         1285 );
1135             }
1136              
1137             sub commit {
1138 33     33 1 37732 my ($self, %args) = @_;
1139             $self->_wrap(
1140             args => \%args,
1141             tx_status => ["i", "a"],
1142             code => sub {
1143 33     33   719 my $dbh = $self->{_dbh};
1144 33         141 my $tx = $self->{_cur_tx};
1145 33 50       335 if ($tx->{status} eq 'a') {
1146 0         0 my $res = $self->_rollback;
1147 0 0       0 return $res unless $res->[0] == 200;
1148 0         0 return [200, "Rolled back"];
1149             }
1150             $dbh->do(
1151 33         573 "DELETE FROM do_action WHERE tx_ser_id=?",{},$tx->{ser_id});
1152             $dbh->do("UPDATE tx SET status=?, commit_time=? WHERE ser_id=?",
1153             {}, "C", $self->{_now}, $tx->{ser_id})
1154 33 50       40576 or return [532, "db: Can't update tx status to committed: ".
1155             $dbh->errstr];
1156 33         34030 [200];
1157             },
1158 33         601 );
1159             }
1160              
1161             sub _rollback {
1162 32     32   351 my ($self) = @_;
1163 32         144 my $dbh = $self->{_dbh};
1164 32         129 my $tx = $self->{_cur_tx};
1165              
1166 32         294 my $res = $self->_action_loop('rollback');
1167 32 100       4298 return $res unless $res->[0] == 200;
1168 20         356 $dbh->do("DELETE FROM do_action WHERE tx_ser_id=?", {}, $tx->{ser_id});
1169 20         360911 $dbh->do("DELETE FROM undo_action WHERE tx_ser_id=?", {}, $tx->{ser_id});
1170 20         274330 [200];
1171             }
1172              
1173             sub _undo {
1174 38     38   173 my ($self, $opts) = @_;
1175 38         222 my $dbh = $self->{_dbh};
1176 38         144 my $tx = $self->{_cur_tx};
1177              
1178 38         242 my $res = $self->_action_loop('undo', undef, $opts);
1179 38 100       1420 return $res unless $res->[0] == 200;
1180 28         527 $dbh->do("DELETE FROM undo_action WHERE tx_ser_id=?", {}, $tx->{ser_id});
1181 28         474436 [200];
1182             }
1183              
1184             sub _redo {
1185 24     24   128 my ($self, $opts) = @_;
1186 24         96 my $dbh = $self->{_dbh};
1187 24         88 my $tx = $self->{_cur_tx};
1188              
1189 24         160 my $res = $self->_action_loop('redo', undef, $opts);
1190 24 100       3682 return $res unless $res->[0] == 200;
1191 13         227 $dbh->do("DELETE FROM do_action WHERE tx_ser_id=?", {}, $tx->{ser_id});
1192 13         242766 [200];
1193             }
1194              
1195             sub rollback {
1196 7     7 1 107977 my ($self, %args) = @_;
1197             $self->_wrap(
1198             args => \%args,
1199             tx_status => ["i", "a"],
1200             rollback => 0, # _action_loop already does rollback
1201             code => sub {
1202 5     5   45 $self->_rollback;
1203             },
1204 7         103 );
1205             }
1206              
1207             sub prepare {
1208 0     0 1 0 [501, "Not implemented"];
1209             }
1210              
1211             sub savepoint {
1212 0     0 1 0 [501, "Not yet implemented"];
1213             }
1214              
1215             sub release_savepoint {
1216 0     0 1 0 [501, "Not yet implemented"];
1217             }
1218              
1219             sub list {
1220 82     82 1 76908 my ($self, %args) = @_;
1221             $self->_wrap2(
1222             args => \%args,
1223             code => sub {
1224 82     82   382 my $dbh = $self->{_dbh};
1225 82         393 my @wheres = ("1");
1226 82         260 my @params;
1227 82 100       471 if ($args{tx_id}) {
1228 74         309 push @wheres, "str_id=?";
1229 74         328 push @params, $args{tx_id};
1230             }
1231 82 100       450 if ($args{tx_status}) {
1232 6         21 push @wheres, "status=?";
1233 6         23 push @params, $args{tx_status};
1234             }
1235 82         1909 my $sth = $dbh->prepare(
1236             "SELECT * FROM tx WHERE ".join(" AND ", @wheres).
1237             " ORDER BY ctime, ser_id");
1238 82         28049 $sth->execute(@params);
1239 82         457 my @res;
1240 82         8442 while (my $row = $sth->fetchrow_hashref) {
1241 81 100       531 if ($args{detail}) {
1242             push @res, {
1243             tx_id => $row->{str_id},
1244             tx_status => $row->{status},
1245             tx_start_time => $row->{ctime},
1246             tx_commit_time=> $row->{commit_time},
1247             tx_summary => $row->{summary},
1248 74         2517 };
1249             } else {
1250 7         182 push @res, $row->{str_id};
1251             }
1252             }
1253 82         2205 [200, "OK", \@res];
1254             },
1255 82         1573 );
1256             }
1257              
1258             sub undo {
1259 34     34 1 16044 my ($self, %args) = @_;
1260              
1261             # find latest committed tx
1262 34 50       198 unless ($args{tx_id}) {
1263 0         0 my $dbh = $self->{_dbh};
1264 0         0 my @row = $dbh->selectrow_array(
1265             "SELECT str_id FROM tx WHERE status='C' ".
1266             "ORDER BY commit_time DESC, ser_id DESC LIMIT 1");
1267 0 0       0 return [412, "There are no committed transactions to undo"] unless @row;
1268 0         0 $args{tx_id} = $row[0];
1269             }
1270              
1271             $self->_wrap(
1272             args => \%args,
1273             tx_status => ["C"],
1274             rollback => 0, # _action_loop already does rollback
1275             code => sub {
1276 34     34   285 delete $self->{_res};
1277 34         304 my $res = $self->_undo({confirm=>$args{confirm}});
1278 34 100 66     812 if ($res->[0] != 200 && $res->[0] != 304) {
1279 10 50 33     211 if ($self->{_res} && $self->{_res}[0] !~ /200|304/) {
1280             return [$self->{_res}[0],
1281 0         0 $self->{_res}[1],
1282             undef,
1283             {tx_result=>$res, prev=>$res}];
1284             } else {
1285 10         84 return err(532, {prev=>$res});
1286             }
1287             } else {
1288 24         199 return [200];
1289             }
1290             },
1291 34         497 );
1292             }
1293              
1294             sub redo {
1295 19     19 1 40879 my ($self, %args) = @_;
1296              
1297             # find first undone committed tx
1298 19 50       137 unless ($args{tx_id}) {
1299 0         0 my $dbh = $self->{_dbh};
1300 0         0 my @row = $dbh->selectrow_array(
1301             "SELECT str_id FROM tx WHERE status='U' ".
1302             "ORDER BY commit_time ASC, ser_id ASC LIMIT 1");
1303 0 0       0 return [412, "There are no undone transactions to redo"] unless @row;
1304 0         0 $args{tx_id} = $row[0];
1305             }
1306              
1307             $self->_wrap(
1308             args => \%args,
1309             tx_status => ["U"],
1310             rollback => 0, # _action_loop already does rollback
1311             code => sub {
1312 19     19   133 delete $self->{_res};
1313 19         174 my $res = $self->_redo({confirm=>$args{confirm}});
1314 19 100 66     405 if ($res->[0] != 200 && $res->[0] != 304) {
1315 11 50 33     303 if ($self->{_res} && $self->{_res}[0] !~ /200|304/) {
1316             return [$self->{_res}[0],
1317 0         0 $self->{_res}[1],
1318             undef,
1319             {tx_result=>$res, prev=>$res}];
1320             } else {
1321 11         96 return err(532, {prev=>$res});
1322             }
1323             } else {
1324 8         61 return [200];
1325             }
1326             },
1327 19         367 );
1328             }
1329              
1330             sub _discard {
1331 87     87   1001 my ($self, $which, %args) = @_;
1332 87 100       1796 my $wmeth = $which eq 'one' ? '_wrap' : '_wrap2';
1333             $self->$wmeth(
1334             label => $which,
1335             args => \%args,
1336             tx_status => $which eq 'one' ? ['C','U','R','X'] : undef,
1337             code => sub {
1338 76     76   354 my $dbh = $self->{_dbh};
1339 76         269 my $sth;
1340 76 100       363 if ($which eq 'one') {
1341 17         177 $sth = $dbh->prepare("SELECT ser_id FROM tx WHERE str_id=?");
1342 17         2479 $sth->execute($self->{_cur_tx}{str_id});
1343             } else {
1344 59         293 my $txs = "'C','U','R','X'";
1345 59 100       336 if ($args{status}) {
1346 116         1008 $txs = join(",",map{"'$_'"}
1347 58         189 grep {/\A[CURX]\z/} @{$args{status}});
  116         705  
  58         240  
1348             }
1349 59         1102 $sth = $dbh->prepare(
1350             "SELECT ser_id FROM tx WHERE status IN ($txs)");
1351 59         20011 $sth->execute;
1352             }
1353 76         531 my @txs;
1354 76         1446 while (my @row = $sth->fetchrow_array) {
1355 39         468 push @txs, $row[0];
1356             }
1357 76 100       451 if (@txs) {
1358 37         231 my $txs = join(",", @txs);
1359 37 50       496 $dbh->do("DELETE FROM tx WHERE ser_id IN ($txs)")
1360             or return [532, "db: Can't delete tx: ".$dbh->errstr];
1361 37         400091 $dbh->do("DELETE FROM do_action WHERE tx_ser_id IN ($txs)");
1362 37         51774 log_info("$lp discard tx: %s", \@txs);
1363             }
1364 76         3285 [200];
1365             },
1366 87 100       4050 );
1367             }
1368              
1369             sub discard {
1370 28     28 1 65317 my $self = shift;
1371 28         203 $self->_discard('one', @_);
1372             }
1373              
1374             sub discard_all {
1375 59     59 1 5397 my $self = shift;
1376 59         390 $self->_discard('all', @_);
1377             }
1378              
1379             1;
1380             # ABSTRACT: A Rinci transaction manager
1381              
1382             __END__
1383              
1384             =pod
1385              
1386             =encoding UTF-8
1387              
1388             =head1 NAME
1389              
1390             Perinci::Tx::Manager - A Rinci transaction manager
1391              
1392             =head1 VERSION
1393              
1394             This document describes version 0.57 of Perinci::Tx::Manager (from Perl distribution Perinci-Tx-Manager), released on 2017-07-10.
1395              
1396             =head1 SYNOPSIS
1397              
1398             # used by Perinci::Access::Schemeless
1399              
1400             =head1 DESCRIPTION
1401              
1402             This class implements transaction and undo manager (TM), as specified by
1403             L<Rinci::function::Transaction> and L<Riap::Transaction>. It is meant to be
1404             instantiated by L<Perinci::Access::Schemeless>, but will also be passed to
1405             transactional functions to save undo/redo data.
1406              
1407             It uses SQLite database to store transaction list and undo/redo data as well as
1408             transaction data directory to provide trash_dir/tmp_dir for functions that
1409             require it.
1410              
1411             =for Pod::Coverage ^(call|get_func_and_meta)$
1412              
1413             =head1 ATTRIBUTES
1414              
1415             =head2 _tx_id
1416              
1417             This is just a convenience so that methods that require tx_id will get the
1418             default value from here if tx_id not specified in arguments.
1419              
1420             =head1 METHODS
1421              
1422             =head2 new(%args) => OBJ
1423              
1424             Create new object. Arguments:
1425              
1426             =over 4
1427              
1428             =item * pa => OBJ
1429              
1430             Perinci::Access::Schemeless object. This is required by Perinci::Tx::Manager to
1431             load/get functions when it wants to perform undo/redo/recovery.
1432             Perinci::Access::Schemeless conveniently require() the Perl modules and wraps
1433             the functions.
1434              
1435             =item * data_dir => STR (default C<~/.perinci/.tx>)
1436              
1437             =item * max_txs => INT (default 1000)
1438              
1439             Limit maximum number of transactions maintained by the TM, including all rolled
1440             back and committed transactions, since they are still recorded in the database.
1441             The default is 1000.
1442              
1443             Not yet implemented.
1444              
1445             After this limit is reached, cleanup will be performed to delete rolled back
1446             transactions, and after that committed transactions.
1447              
1448             =item * max_open_txs => INT (default 100)
1449              
1450             Limit maximum number of open (in progress, aborted, prepared) transactions. This
1451             exclude resolved transactions (rolled back and committed). The default is no
1452             limit.
1453              
1454             Not yet implemented.
1455              
1456             After this limit is reached, starting a new transaction will fail.
1457              
1458             =item * max_committed_txs => INT (default 100)
1459              
1460             Limit maximum number of committed transactions that is recorded by the database.
1461             This is equal to the number of undo steps that are remembered.
1462              
1463             After this limit is reached, cleanup will automatically be performed so that
1464             the oldest committed transactions are purged.
1465              
1466             Not yet implemented.
1467              
1468             =item * max_open_age => INT
1469              
1470             Limit the maximum age of open transactions (in seconds). If this limit is
1471             reached, in progress transactions will automatically be purged because it times
1472             out.
1473              
1474             Not yet implemented.
1475              
1476             =item * max_committed_age => INT
1477              
1478             Limit the maximum age of committed transactions (in seconds). If this limit is
1479             reached, the old transactions will start to be purged.
1480              
1481             Not yet implemented.
1482              
1483             =back
1484              
1485             =head2 $tx->get_trash_dir => RESP
1486              
1487             =head2 $tx->get_tmp_dir => RESP
1488              
1489             =head2 $tm->begin(%args) => RESP
1490              
1491             Start a new transaction.
1492              
1493             Arguments: tx_id (str, required, unless already supplied via _tx_id()), twopc
1494             (bool, optional, currently must be false since distributed transaction is not
1495             yet supported), summary (optional).
1496              
1497             TM will create an entry for this transaction in its database.
1498              
1499             =head2 $tm->action(%args) => RESP
1500              
1501             Perform action for the transaction by calling one or more functions.
1502              
1503             Arguments: C<f> (fully-qualified function name), C<args> (arguments to function,
1504             hashref). Or, C<actions> (list of function calls, array, C<[[f1, args1], ...]>,
1505             alternative to specifying C<f> and C<args>), C<confirm> (bool, if set to true
1506             then will pass C<< -confirm => 1 >> special argument to functions; see status
1507             code 331 in L<Rinci::function> for more details on this).
1508              
1509             TM will also pass the following special arguments: C<< -tx_v => PROTO_VERSION
1510             >>, C<< -tx_rollback => 1 >> during rollback, and C<< -tx_recovery => 1 >>
1511             during recovery, for informative purposes.
1512              
1513             To perform a single action, specify C<f> and C<args>. To perform several
1514             actions, supply C<actions>.
1515              
1516             Note: special arguments (those started with dash, C<->) will be stripped from
1517             function arguments by TM.
1518              
1519             If response from function is not success, rollback() will be called.
1520              
1521             Tip: To call in dry-run mode to function supporting dry-run mode, or to call a
1522             pure function, you do not have to use TM's action() but rather call the function
1523             directly, since this will not have any side effects.
1524              
1525             Tip: During C<fix_state>, function can return C<stash> in result metadata which
1526             can be set to hash. This will be collected and passed by TM in C<-stash> special
1527             argument. This is useful in multiple actions where one action might need to
1528             check result from previous action.
1529              
1530             =head2 $tx->commit(%args) => RESP
1531              
1532             Commit a transaction.
1533              
1534             Arguments: C<tx_id>
1535              
1536             =head2 $tx->rollback(%args) => RESP
1537              
1538             Rollback a transaction.
1539              
1540             Arguments: C<tx_id>, C<sp_id> (optional, savepoint name to rollback to a
1541             specific savepoint only).
1542              
1543             Currently rolling back to a savepoint is not implemented.
1544              
1545             =head2 $tx->prepare(%args) => RESP
1546              
1547             Prepare a transaction.
1548              
1549             Arguments: C<tx_id>
1550              
1551             Currently will return 501 (not implemented). Rinci::Transaction does not yet
1552             support distributed transaction.
1553              
1554             =head2 $tx->savepoint(%args) => RESP
1555              
1556             Declare a savepoint.
1557              
1558             Arguments: C<tx_id>, C<sp_id> (savepoint name).
1559              
1560             Currently not implemented.
1561              
1562             =head2 $tx->release_savepoint(%args) => RESP
1563              
1564             Release (forget) a savepoint.
1565              
1566             Arguments: C<tx_id>, C<sp_id> (savepoint name).
1567              
1568             Currently not implemented.
1569              
1570             =head2 $tx->undo(%args) => RESP
1571              
1572             Undo a committed transaction.
1573              
1574             Arguments: C<tx_id>, C<confirm> (bool, if set to true then will pass C<<
1575             -confirm => 1 >> special argument to functions; see status code 331
1576             in L<Rinci::function> for more details on this).
1577              
1578             =head2 $tx->redo(%args) => RESP
1579              
1580             Redo an undone committed transaction.
1581              
1582             Arguments: C<tx_id>, C<confirm> (bool, if set to true then will pass C<<
1583             -confirm => 1 >> special argument to functions; see status code 331
1584             in L<Rinci::function> for more details on this).
1585              
1586             =head2 $tx->list(%args) => RESP
1587              
1588             List transactions.
1589              
1590             Arguments: B<detail> (bool, default 0, whether to return transaction records
1591             instead of just a list of transaction ID's).
1592              
1593             Return an array of results sorted by creation date (in ascending order).
1594              
1595             =head2 $tx->discard(%args) => RESP
1596              
1597             Discard (forget) a client's committed transaction.
1598              
1599             Arguments: C<tx_id>
1600              
1601             Transactions that can be discarded are committed, undone committed, or
1602             inconsistent ones (i.e., those with final statuses C<C>, C<U>, C<X>).
1603              
1604             =head2 $tm->discard_all(%args) => RESP
1605              
1606             Discard (forget) all committed transactions.
1607              
1608             =head1 HOMEPAGE
1609              
1610             Please visit the project's homepage at L<https://metacpan.org/release/Perinci-Tx-Manager>.
1611              
1612             =head1 SOURCE
1613              
1614             Source repository is at L<https://github.com/perlancar/perl-Perinci-Tx-Manager>.
1615              
1616             =head1 BUGS
1617              
1618             Please report any bugs or feature requests on the bugtracker website L<https://rt.cpan.org/Public/Dist/Display.html?Name=Perinci-Tx-Manager>
1619              
1620             When submitting a bug or request, please include a test-file or a
1621             patch to an existing test-file that illustrates the bug or desired
1622             feature.
1623              
1624             =head1 SEE ALSO
1625              
1626             L<Rinci::Transaction>
1627              
1628             L<Perinci::Access::Schemeless>
1629              
1630             =head1 AUTHOR
1631              
1632             perlancar <perlancar@cpan.org>
1633              
1634             =head1 COPYRIGHT AND LICENSE
1635              
1636             This software is copyright (c) 2017, 2016, 2015, 2014, 2013, 2012 by perlancar@cpan.org.
1637              
1638             This is free software; you can redistribute it and/or modify it under
1639             the same terms as the Perl 5 programming language system itself.
1640              
1641             =cut