File Coverage

lib/Mojo/IOLoop/ReadWriteProcess/Shared/Memory.pm
Criterion Covered Total %
statement 24 112 21.4
branch 0 32 0.0
condition 0 27 0.0
subroutine 8 34 23.5
pod 1 14 7.1
total 33 219 15.0


line stmt bran cond sub pod time code
1             package Mojo::IOLoop::ReadWriteProcess::Shared::Memory;
2              
3 38     38   276 use Mojo::IOLoop::ReadWriteProcess::Shared::Lock;
  38         104  
  38         1947  
4 38     38   220 use Mojo::Base -base;
  38         64  
  38         443  
5              
6 38     38   7253 use Carp qw(croak confess);
  38         109  
  38         2310  
7 38     38   224 use constant DEBUG => $ENV{MOJO_PROCESS_DEBUG};
  38         59  
  38         2784  
8 38     38   18932 use IPC::SharedMem;
  38         94448  
  38         1254  
9 38     38   258 use Config;
  38         78  
  38         1646  
10             use IPC::SysV
11 38     38   224 qw(ftok IPC_PRIVATE IPC_NOWAIT IPC_CREAT IPC_EXCL S_IRUSR S_IWUSR S_IRGRP S_IWGRP S_IROTH S_IWOTH SEM_UNDO S_IRWXU S_IRWXG);
  38         74  
  38         4010  
12              
13             our @EXPORT_OK = qw(shared_memory shared_lock semaphore);
14 38     38   242 use Exporter 'import';
  38         76  
  38         83328  
15              
16             has key => sub { Mojo::IOLoop::ReadWriteProcess::Shared::Semaphore::_genkey() };
17             has 'buffer';
18             has destroy => 0;
19             has flags => S_IRWXU() | S_IRWXG() | IPC_CREAT();
20             has lock_flags => IPC_CREAT | IPC_EXCL | S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP
21             | S_IROTH | S_IWOTH;
22             has _size => 10 * 1024;
23             has _shared_memory => sub { $_[0]->_newmem() };
24             has _shared_size =>
25             sub { $_[0]->_newmem((2 * shift->key) - 1, $Config{intsize}) };
26             has _lock => sub {
27             Mojo::IOLoop::ReadWriteProcess::Shared::Lock->new(
28             flags => $_[0]->lock_flags,
29             key => (2 * shift->key) + 1
30             );
31             };
32              
33             has dynamic_resize => 1;
34             has dynamic_decrement => 1;
35             has dynamic_increment => 1;
36              
37 0     0 0   sub shared_lock { Mojo::IOLoop::ReadWriteProcess::Shared::Lock->new(@_) }
38 0     0 0   sub semaphore { Mojo::IOLoop::ReadWriteProcess::Shared::Semaphore->new(@_) }
39 0     0 0   sub shared_memory { __PACKAGE__->new(@_) }
40              
41             sub new {
42 0     0 1   my $s = shift->SUPER::new(@_);
43 0 0         confess 'Could not allocate shared size memory ' . $s->key
44             unless $s->_shared_size;
45 0           $s->_loadsize;
46 0 0         confess 'Could not allocate shared memory with key ' . $s->key
47             unless $s->_shared_memory;
48 0           return $s;
49             }
50              
51 0     0     sub _encode_content { $_[0]->buffer(unpack 'H*', shift->buffer()) }
52 0     0     sub _decode_content { $_[0]->buffer(pack 'H*', shift->buffer()) }
53              
54             sub _writesize {
55 0     0     my $self = shift;
56 0           my $size = shift;
57 0           $self->_shared_size()->write(pack('I', $size), 0, $Config{intsize});
58             }
59              
60             sub _readsize {
61 0     0     my $self = shift;
62 0           my $s = $self->_shared_size()->read(0, $Config{intsize});
63 0           return unpack('I', $s);
64             }
65              
66             sub _loadsize {
67 0     0     my $s = $_[0]->_readsize;
68 0           my $cur_size = $_[0]->_size;
69 0 0         $s = $_[0]->_size if $s == 0;
70 0 0         $_[0]->_size($s =~ /\d/ ? $s : $_[0]->_size);
71 0 0 0       $_[0]->_writesize($_[0]->_size) and $_[0]->_shared_memory($_[0]->_newmem)
72             if $s != $cur_size;
73              
74 0           warn "[debug:$$] Mem size: " . $_[0]->_size if DEBUG;
75             }
76              
77             sub _reload {
78 0     0     $_[0]->_shared_memory($_[0]->_newmem);
79 0           $_[0]->_shared_memory($_[0]->_newmem) until defined $_[0]->_shared_memory;
80             }
81              
82             # Must be run in a locked section
83             sub resize {
84 0     0 0   my $self = shift;
85 0           $self->_shared_memory->detach();
86 0           1 until $self->_safe_remove;
87 0   0       $self->_size($_[0] // length $self->buffer);
88 0           $self->_reload;
89              
90             # XXX: is faster to re-allocate the shared memory with shmctl, but SHM_SIZE
91             # seems to not be really portable:
92             # shmctl $_[0]->_shared_memory->id, SHM_SIZE, struct
93             # $_[0]->_writesize($_[1] // length $_[0]->buffer ) if $_[0]->_shared_memory;
94             }
95              
96             # Must be run in a locked section
97             sub _sync_size {
98 0     0     warn "[debug:$$] Sync size for content ("
99             . length($_[0]->buffer)
100             . ") vs currently allocated ("
101             . $_[0]->_size . ")"
102             if DEBUG;
103 0           $_[0]->resize;
104             }
105              
106             sub save {
107 0     0 0   warn "[debug:$$] Writing data : " . $_[0]->buffer if DEBUG;
108              
109 0           $_[0]->_encode_content;
110              
111 0           eval {
112             # Resize
113 0 0 0       $_[0]->_sync_size
      0        
114             if (
115             $_[0]->dynamic_resize && (
116             (
117             $_[0]->dynamic_increment
118             && (defined $_[0]->buffer && length $_[0]->buffer > $_[0]->_size)
119             ) # Increment
120             || ($_[0]->dynamic_decrement
121             && (defined $_[0]->buffer && $_[0]->_size > length $_[0]->buffer)
122             ) # Decrement
123             ));
124 0 0         $_[0]->_writesize($_[0]->_size) if $_[0]->_shared_memory();
125              
126             # $_[0]->_reload;
127              
128 0 0         $_[0]->_shared_memory()->write($_[0]->buffer, 0, $_[0]->_size)
129             if $_[0]->_shared_memory();
130             };
131              
132 0 0 0       warn "[debug:$$] Error Saving data : $@" if $@ && DEBUG;
133              
134 0 0         $_[0]->_shared_memory->detach() if $_[0]->_shared_memory;
135 0 0         return if $@;
136 0           return 1;
137             }
138              
139             sub _newmem {
140 0   0 0     IPC::SharedMem->new(
      0        
141             $_[1] // $_[0]->key(),
142             $_[2] // $_[0]->_size,
143             $_[0]->flags
144             );
145             }
146              
147             sub load {
148              
149 0     0 0   eval {
150 0           $_[0]->_loadsize;
151 0           warn "[debug:$$] Reading " . $_[0]->_size if DEBUG;
152 0           $_[0]->_reload;
153 0           $_[0]->_shared_memory->attach();
154 0           $_[0]->buffer($_[0]->_shared_memory()->read(0, $_[0]->_size));
155              
156             # XXX: Remove the 0 padding?
157             # substr($_[0]->{buffer}, index($_[0]->{buffer}, "\0")) = "";
158 0           $_[0]->_decode_content;
159             };
160              
161 0 0 0       warn "[debug:$$] Error Loading data : $@" if $@ && DEBUG;
162 0 0         return if $@;
163 0           return 1;
164             }
165              
166             sub _safe_remove {
167 0     0     my $self = shift;
168 0           my $stat = $self->_shared_memory()->stat();
169 0 0 0       if (defined($stat) && ($stat->nattch() == 0)) {
170 0           $self->_shared_memory()->remove();
171 0           return 1;
172             }
173 0           return 0;
174             }
175              
176             sub remove {
177 0     0 0   my $self = shift;
178 0           $self->_shared_memory->detach();
179 0           $self->_lock->remove;
180 0           $self->_shared_size()->remove();
181 0           return $self->_safe_remove;
182             }
183              
184             sub clean {
185 0     0 0   my $self = shift;
186 0     0     $self->lock_section(sub { $self->buffer(' ')->save });
  0            
187             }
188              
189             sub unlock {
190 0     0 0   eval { $_[0]->save };
  0            
191 0           shift->_lock->unlock(@_);
192             }
193              
194 0     0 0   sub lock { my $s = shift; my $r = $s->_lock->lock(@_); $s->load; $r }
  0            
  0            
  0            
195 0     0 0   sub try_lock { $_[0]->_lock->try_lock() }
196              
197             sub lock_section {
198 0     0 0   my ($self, $fn) = @_;
199              
200             return $self->_lock->lock_section(
201             sub {
202 0     0     my $r;
203             {
204 0           $self->load;
  0            
205 0           local $@;
206 0           $r = eval { $fn->() };
  0            
207 0 0 0       warn "[debug:$$] Error inside locked memory section : $@"
208             if $@ && DEBUG;
209 0           eval { $self->save };
  0            
210             };
211 0           return $r;
212 0           });
213             }
214              
215 0     0 0   sub stat { shift->_shared_memory->stat }
216              
217 0 0   0     sub DESTROY { $_[0]->remove if $_[0]->destroy() }
218              
219             !!42;