File Coverage

blib/lib/EMDIS/ECS/FileBackedMessage.pm
Criterion Covered Total %
statement 175 364 48.0
branch 71 256 27.7
condition 6 48 12.5
subroutine 24 27 88.8
pod 0 19 0.0
total 276 714 38.6


line stmt bran cond sub pod time code
1             #!/usr/bin/perl -w
2             #
3             # Copyright (C) 2010-2020 National Marrow Donor Program. All rights reserved.
4             #
5             # For a description of this module, please refer to the POD documentation
6             # embedded at the bottom of the file (e.g. perldoc EMDIS::ECS::FileBackedMessage).
7              
8             package EMDIS::ECS::FileBackedMessage;
9              
10 1         229 use EMDIS::ECS qw($ECS_CFG $ECS_NODE_TBL $FILEMODE $VERSION ecs_is_configured
11             format_datetime format_doc_filename format_msg_filename
12             log_debug log_info log_warn log_error log_fatal
13             send_amqp_message send_encrypted_message send_email
14 1     1   12625 dequote trim);
  1         5  
15 1     1   9 use Fcntl qw(:DEFAULT :flock);
  1         3  
  1         369  
16 1     1   10 use File::Basename;
  1         37  
  1         72  
17 1     1   8 use File::Spec::Functions qw(catdir catfile);
  1         2  
  1         64  
18 1     1   7 use File::Temp qw(tempfile);
  1         2  
  1         43  
19 1     1   5 use IO::File;
  1         25  
  1         242  
20 1     1   7 use strict;
  1         2  
  1         4307  
21              
22             # ----------------------------------------------------------------------
23             # Constructor.
24             # If error encountered, returns error message instead of object reference.
25             sub new
26             {
27 29     29 0 1441 my $arg1 = shift;
28 29         53 my $this;
29 29 50       70 if(ref $arg1)
30             {
31             # invoked as instance method
32 0         0 $this = $arg1;
33             }
34             else
35             {
36             # invoked as class method
37 29         58 $this = {};
38 29         58 bless $this, $arg1;
39             }
40              
41 29         47 my $err = '';
42 29         52 my ($sender_node_id, $seq_num, $filename);
43 29         52 my $argc = scalar(@_);
44 29 100       67 if($argc <= 1)
    50          
45             {
46 20         37 $filename = shift;
47             }
48             elsif($argc == 3)
49             {
50 9         28 ($sender_node_id, $seq_num, $filename) = @_;
51 9 100       32 $this->{sender_node_id} = $sender_node_id if $sender_node_id;
52 9 100       21 $this->{seq_num} = $seq_num if $seq_num;
53             }
54             else
55             {
56 0         0 return "Illegal usage -- expected 0, 1, or 3 parameters: " .
57             "[filename] or , , ";
58             }
59              
60             # set presumed message type flags - can be overridden by email headers or subject
61 29         67 $this->{is_ecs_message} = 1;
62 29         55 $this->{is_meta_message} = '';
63 29         48 $this->{is_document} = '';
64 29 100 66     271 if(defined $filename and $filename =~ /(\.doc|\.doc\.xml)$/io)
65             {
66             # filename ending in .doc or .doc.xml indicates document (not message)
67 2         6 $this->{is_ecs_message} = '';
68 2         4 $this->{is_meta_message} = '';
69 2         2 $this->{is_document} = 1;
70             }
71              
72 29         61 $this->{temp_files} = [];
73 29         54 $this->{is_closed} = 0;
74              
75             # if $filename not specified, read input from stdin
76 29 50       60 if(not $filename)
77             {
78             # read from stdin, create temp file
79 0         0 my $template = sprintf('%s_XXXX', format_datetime(time,
80             '%04d%02d%02d_%02d%02d%02d'));
81 0 0       0 return "Unable to create temp file from stdin: ECS is not configured!"
82             unless ecs_is_configured();
83 0         0 my $fh;
84 0         0 ($fh, $filename) = tempfile($template,
85             DIR => catdir($ECS_CFG->ECS_TMP_DIR),
86             SUFFIX => '.msg');
87 0         0 binmode(STDIN);
88 0         0 binmode($fh);
89 0         0 while(1)
90             {
91 0         0 my $buffer;
92              
93 0         0 my $readlen = read STDIN, $buffer, 65536;
94 0 0       0 if(not defined $readlen)
95             {
96 0         0 $err = "Unexpected problem reading STDIN: $!";
97 0         0 last;
98             }
99              
100 0 0       0 last if $readlen == 0;
101              
102 0 0       0 if(not print $fh $buffer)
103             {
104 0         0 $err = "Unexpected problem writing file $filename: $!";
105 0         0 last;
106             }
107             }
108 0         0 close $fh;
109 0 0       0 if($err)
110             {
111 0         0 unlink $filename;
112 0         0 return $err;
113             }
114 0         0 push @{$this->{temp_files}}, $filename;
  0         0  
115             }
116              
117 29         56 $this->{filename} = $filename;
118 29         43 my $file_handle;
119 29 50       1114 return "Unable to open input file $filename: $!"
120             unless open $file_handle, "+< $filename";
121 29         138 $this->{file_handle} = $file_handle;
122 29         82 binmode $file_handle;
123              
124             # get exclusive lock (with retry loop)
125             # protects against reading a file while another process is writing it
126 29         49 my $locked = '';
127 29         80 for my $retry (1..5)
128             {
129 29         247 $locked = flock $file_handle, LOCK_EX | LOCK_NB;
130 29 50       97 last if $locked;
131             }
132 29 50       64 if(!$locked)
133             {
134 0         0 $err = "Unable to lock input file $filename: $!";
135 0         0 close $file_handle;
136 0         0 return $err;
137             }
138              
139 29         48 my $email_headers = '';
140 29         45 my $data_offset = 0;
141              
142             # attempt to read email headers only if sender_node_id not yet defined
143 29 100       65 if(not exists $this->{sender_node_id})
144             {
145             # attempt to read email headers from file, determine data offset
146 21         30 my $buf;
147 21         33 while(1)
148             {
149 5170         8124 my $bytecount = read $file_handle, $buf, 1;
150              
151 5170 50       8557 if(not defined $bytecount)
152             {
153 0         0 $err = "Unexpected problem reading from file $filename: $!";
154 0         0 last;
155             }
156              
157 5170 50 0     7424 if($bytecount > 0)
    0          
158             {
159 5170         6686 $email_headers .= $buf;
160 5170         6207 $data_offset++;
161              
162             # first empty line ends potential email header
163 5170 100       8966 last if $email_headers =~ /\r?\n\r?\n$/so;
164             }
165             elsif($bytecount == 0 or $data_offset >= 1048576)
166             {
167             # assume file does not contain email header
168             # if EOF encountered or no empty line found in first X bytes
169 0         0 $data_offset = 0;
170 0         0 last;
171             }
172             }
173 21 50       49 if($err)
174             {
175 0         0 close $file_handle;
176 0         0 return $err;
177             }
178             }
179              
180 29 100       58 if($data_offset > 0)
181             {
182             # convert headers to more easily parseable format, store in this obj
183 21         218 $email_headers =~ s/\r?\n/\n/go;
184              
185             # look for "Subject" line
186 21 100       137 if($email_headers =~ /^Subject:\s*(.+?)$/imo)
187             {
188 20         103 $this->{subject} = $1;
189 20         39 $this->{email_headers} = $email_headers;
190 20         34 $this->{data_offset} = $data_offset;
191             }
192             }
193              
194             # absence of "Subject" line indicates file contains data only
195 29 100       62 if(not exists $this->{subject})
196             {
197 9         21 $this->{data_offset} = 0;
198 9         64 return $this;
199             }
200              
201             # parse "Subject" into MAIL_MRK:sender_node_id[:seqnum]
202 20         33 my $mail_mrk = 'EMDIS';
203 20 50       68 if(ecs_is_configured())
204             {
205 0         0 $mail_mrk = $ECS_CFG->MAIL_MRK;
206             }
207             else
208             {
209 20         788 warn "ECS not configured, using MAIL_MRK = '$mail_mrk'\n";
210             }
211 20 100       319 if($this->{subject} =~ /$mail_mrk:(\S+?):(\d+)(:(\d+)\/(\d+))?\s*$/i)
    100          
    100          
212             {
213             # regular message
214 10         22 $this->{is_ecs_message} = 1;
215 10         20 $this->{is_meta_message} = '';
216 10         14 $this->{is_document} = '';
217 10         32 $this->{sender_node_id} = $1;
218 10         49 $this->{seq_num} = $2;
219 10 100       53 $this->{part_num} = $4 if defined $4;
220 10 100       30 $this->{num_parts} = $5 if defined $5;
221 10 100 66     53 if(exists $this->{part_num} and exists $this->{num_parts}
      66        
222             and $this->{part_num} > $this->{num_parts})
223             {
224 1         18 close $file_handle;
225 1         9 return "part_num is greater than num_parts: " . $this->{subject};
226             }
227             }
228             elsif($this->{subject} =~ /$mail_mrk:(\S+?):(\d+):DOC\s*$/io) {
229             # document
230 1         4 $this->{sender_node_id} = $1;
231 1         4 $this->{is_ecs_message} = '';
232 1         4 $this->{is_meta_message} = '';
233 1         3 $this->{is_document} = 1;
234 1         3 $this->{seq_num} = $2;
235             }
236             elsif($this->{subject} =~ /$mail_mrk:(\S+)\s*$/i)
237             {
238             # meta-message
239 4         17 $this->{sender_node_id} = $1;
240 4         5 $this->{is_ecs_message} = 1;
241 4         9 $this->{is_meta_message} = 1;
242 4         9 $this->{is_document} = '';
243             }
244             else
245             {
246             # subject line indicates this is not an ECS message or document
247 5         11 $this->{is_ecs_message} = '';
248 5         9 $this->{is_meta_message} = '';
249 5         10 $this->{is_document} = '';
250             }
251              
252 19 50       42 return $err if $err;
253              
254 19         99 return $this;
255             }
256              
257             # ----------------------------------------------------------------------
258             # prepare for object destruction: close $this->{file_handle}, delete
259             # temp files
260             sub cleanup
261             {
262 29     29 0 41 my $this = shift;
263 29 50       57 die "cleanum() must only be called as an instance method!"
264             unless ref $this;
265             close $this->{file_handle}
266 29 50       406 if exists $this->{file_handle};
267 29         75 foreach my $temp_file (@{$this->{temp_files}})
  29         91  
268             {
269 0         0 unlink $temp_file;
270             }
271 29         97 $this->{is_closed} = 1;
272             }
273              
274             # ----------------------------------------------------------------------
275             # Accessor method (read-only).
276             sub data_offset
277             {
278 4     4 0 10 my $this = shift;
279 4 50       10 die "data_offset() must only be called as an instance method!"
280             unless ref $this;
281 4         22 return $this->{data_offset};
282             }
283              
284             # ----------------------------------------------------------------------
285             # Accessor method (read-only).
286             sub email_headers
287             {
288 4     4 0 216 my $this = shift;
289 4 50       11 die "email_headers() must only be called as an instance method!"
290             unless ref $this;
291 4         21 return $this->{email_headers};
292             }
293              
294             # ----------------------------------------------------------------------
295             # Accessor method (read-only).
296             sub filename
297             {
298 1     1 0 350 my $this = shift;
299 1 50       5 die "filename() must only be called as an instance method!"
300             unless ref $this;
301 1         5 return $this->{filename};
302             }
303              
304             # ----------------------------------------------------------------------
305             # Accessor method (read-only).
306             sub hub_rcv
307             {
308 2     2 0 5 my $this = shift;
309 2 50       6 die "hub_rcv() must only be called as an instance method!"
310             unless ref $this;
311 2         7 return $this->{hub_rcv};
312             }
313              
314             # ----------------------------------------------------------------------
315             # Accessor method (read-only).
316             sub hub_snd
317             {
318 2     2 0 4 my $this = shift;
319 2 50       6 die "hub_snd() must only be called as an instance method!"
320             unless ref $this;
321 2         8 return $this->{hub_snd};
322             }
323              
324             # ----------------------------------------------------------------------
325             # Accessor method (read-only).
326             sub is_ecs_message
327             {
328 20     20 0 784 my $this = shift;
329 20 50       51 die "is_ecs_message() must only be called as an instance method!"
330             unless ref $this;
331 20         95 return $this->{is_ecs_message};
332             }
333              
334             # ----------------------------------------------------------------------
335             # Accessor method (read-only).
336             sub is_meta_message
337             {
338 16     16 0 30 my $this = shift;
339 16 50       44 die "is_meta_message() must only be called as an instance method!"
340             unless ref $this;
341 16         71 return $this->{is_meta_message};
342             }
343              
344             # ----------------------------------------------------------------------
345             # Accessor method (read-only).
346             sub is_document
347             {
348 3     3 0 7 my $this = shift;
349 3 50       11 die "is_document() must only be called as an instance method!"
350             unless ref $this;
351 3         12 return $this->{is_document};
352             }
353              
354             # ----------------------------------------------------------------------
355             # Accessor method (read-only).
356             sub num_parts
357             {
358 3     3 0 7 my $this = shift;
359 3 50       7 die "num_parts() must only be called as an instance method!"
360             unless ref $this;
361 3         13 return $this->{num_parts};
362             }
363              
364             # ----------------------------------------------------------------------
365             # Accessor method (read-only).
366             sub part_num
367             {
368 3     3 0 8 my $this = shift;
369 3 50       9 die "part_num() must only be called as an instance method!"
370             unless ref $this;
371 3         21 return $this->{part_num};
372             }
373              
374             # ----------------------------------------------------------------------
375             # Accessor method (read-only).
376             sub sender
377             {
378 0     0 0 0 my $this = shift;
379 0 0       0 die "sender() must only be called as an instance method!"
380             unless ref $this;
381 0         0 return $this->{sender_node_id};
382             }
383              
384             # ----------------------------------------------------------------------
385             # Accessor method (read-only).
386             sub sender_node_id
387             {
388 14     14 0 29 my $this = shift;
389 14 50       74 die "sender_node_id() must only be called as an instance method!"
390             unless ref $this;
391 14         68 return $this->{sender_node_id};
392             }
393              
394             # ----------------------------------------------------------------------
395             # Accessor method (read-only).
396             sub seq_num
397             {
398 10     10 0 22 my $this = shift;
399 10 50       52 die "seq_num() must only be called as an instance method!"
400             unless ref $this;
401 10         50 return $this->{seq_num};
402             }
403              
404             # ----------------------------------------------------------------------
405             # Accessor method (read-only).
406             sub subject
407             {
408 8     8 0 432 my $this = shift;
409 8 50       22 die "subject() must only be called as an instance method!"
410             unless ref $this;
411 8         46 return $this->{subject};
412             }
413              
414             # ----------------------------------------------------------------------
415             # Accessor method (read only)
416             sub temp_files
417             {
418 0     0 0 0 my $this = shift;
419 0 0       0 die "temp_files() must only be called as an instance method!"
420             unless ref $this;
421 0         0 return @{$this->{temp_files}};
  0         0  
422             }
423              
424             # ----------------------------------------------------------------------
425             # object destructor, called by perl garbage collector
426             sub DESTROY
427             {
428 56     56   158 my $this = shift;
429 56 50       167 die "DESTROY() must only be called as an instance method!"
430             unless ref $this;
431 56 100       352 $this->cleanup unless $this->{is_closed};
432             }
433              
434             # ----------------------------------------------------------------------
435             # read first portion of message, attempt to extract HUB_SND and HUB_RCV
436             # (deprecated -- may be called explicitly, but is no longer used by FileBackedMessage constructor)
437             sub inspect_fml
438             {
439 6     6 0 20 my $this = shift;
440 6 50       16 return "inspect_fml() must only be called as an instance method!"
441             unless ref $this;
442             return "inspect_fml(): this FileBackedMessage object is closed!"
443 6 50       34 if $this->{is_closed};
444              
445             # read first part of FML payload, look for HUB_SND, HUB_RCV
446              
447             return "Unable to position file pointer for file $this->{filename}" .
448             " to position $this->{data_offset}: $!"
449 6 50       66 unless seek $this->{file_handle}, $this->{data_offset}, 0;
450 6         16 my $fml;
451 6         134 my $bytecount = read $this->{file_handle}, $fml, 65536;
452 6 50       20 return "Unable to read from file " . $this->{filename} . ": $!"
453             unless defined $bytecount;
454              
455             # compute is_ecs_message and is_meta_message
456 6 100       48 if($fml =~ /^\s*(BLOCK_BEGIN\s+\w+\s*;\s*)?\w+\s*:/iso)
    100          
457             {
458 4         8 $this->{is_ecs_message} = 1;
459 4         8 $this->{is_meta_message} = '';
460 4         8 $this->{is_document} = '';
461             }
462             elsif($fml =~ /^\s*msg_type\s*=\s*\S+/isom)
463             {
464 1         4 $this->{is_ecs_message} = 1;
465 1         3 $this->{is_meta_message} = 1;
466 1         3 $this->{is_document} = '';
467 1         5 return '';
468             }
469             else
470             {
471 1         4 $this->{is_ecs_message} = '';
472 1         3 $this->{is_meta_message} = '';
473 1         2 $this->{is_document} = '';
474 1         5 return '';
475             }
476              
477             # Note: this code only understands the simple forms of FML assignments
478             # (not the extended /FIELDS form)
479              
480             # look for HUB_RCV
481 4 100       19 if($fml =~ /HUB_RCV\s*=\s*([^,;]+)/iso) # presumes [^,;] in HUB_RCV
482             {
483 3         16 $this->{hub_rcv} = dequote(trim($1));
484             }
485              
486             # look for HUB_SND
487 4 100       35 if($fml =~ /HUB_SND\s*=\s*([^,;]+)/iso) # presumes [^,;] in HUB_SND
488             {
489 3         10 $this->{hub_snd} = dequote(trim($1));
490             }
491              
492 4         13 return '';
493             }
494              
495             # ----------------------------------------------------------------------
496             sub send_this_message
497             {
498 0     0 0   my $this = shift;
499 0 0         return "send_this_message() must only be called as an instance method!"
500             unless ref $this;
501             return "send_this_message(): this FileBackedMessage object is closed!"
502 0 0         if $this->{is_closed};
503             return "send_this_message(): this FileBackedMessage object represents " .
504             "only a partial message!"
505 0 0 0       if defined $this->{num_parts} and $this->{num_parts} > 1;
506              
507             # initialize
508 0           my $rcv_node_id = shift;
509 0           my $is_re_send = shift;
510 0           my $part_num = shift;
511 0 0         return "send_this_message(): ECS has not been configured."
512             unless ecs_is_configured();
513 0           my $cfg = $ECS_CFG;
514 0           my $node_tbl = $ECS_NODE_TBL;
515 0           my $err = '';
516              
517 0 0 0       return "send_this_message(): Missing \$rcv_node_id!"
518             unless defined $rcv_node_id and $rcv_node_id;
519              
520             # lock node_tbl, look up $rcv_node_id
521 0           my $was_locked = $node_tbl->LOCK;
522 0 0         if(not $was_locked)
523             {
524 0 0         $node_tbl->lock() # lock node_tbl
525             or return "send_this_message(): unable to lock node_tbl: " .
526             $node_tbl->ERROR;
527             }
528 0           my $node = $node_tbl->read($rcv_node_id);
529 0 0         if(not $node)
530             {
531 0 0         $node_tbl->unlock() unless $was_locked; # unlock node_tbl if needed
532 0           return "send_this_message(): node not found: $rcv_node_id";
533             }
534 0 0         if(not $node->{addr})
535             {
536 0 0         $node_tbl->unlock() unless $was_locked; # unlock node_tbl if needed
537 0           return "send_this_message(): addr not defined for node: $rcv_node_id";
538             }
539              
540             # compute or assign message seq_num
541 0           my $seq_num = '';
542 0 0 0       if($is_re_send and not $this->{is_document})
    0 0        
    0          
    0          
543             {
544             # sanity checks
545 0 0         if(not defined $this->{seq_num})
546             {
547 0 0         $node_tbl->unlock() unless $was_locked; # unlock node_tbl if needed
548 0           return "send_this_message(): seq_num not defined for RE_SEND";
549             }
550 0 0         if($this->{seq_num} > $node->{out_seq})
551             {
552 0 0         $node_tbl->unlock() unless $was_locked; # unlock node_tbl if needed
553             return "send_this_message(): seq_num for RE_SEND (" .
554             $this->{seq_num} . ") is greater than out_seq for node " .
555 0           "$rcv_node_id (" . $node->{out_seq} . ")!";
556             }
557 0           $seq_num = $this->{seq_num};
558             }
559             elsif($is_re_send and $this->{is_document})
560             {
561             # sanity checks
562 0 0         if(not defined $this->{seq_num})
563             {
564 0 0         $node_tbl->unlock() unless $was_locked; # unlock node_tbl if needed
565 0           return "send_this_message(): seq_num not defined for DOC_RE_SEND";
566             }
567 0 0         if($this->{seq_num} > $node->{doc_out_seq})
568             {
569 0 0         $node_tbl->unlock() unless $was_locked; # unlock node_tbl if needed
570             return "send_this_message(): seq_num for DOC_RE_SEND (" .
571             $this->{seq_num} . ") is greater than doc_out_seq for node " .
572 0           "$rcv_node_id (" . $node->{doc_out_seq} . ")!";
573             }
574 0           $seq_num = $this->{seq_num};
575             }
576             elsif($this->{is_document})
577             {
578             # automatically get next (doc) sequence number
579 0           $node->{doc_out_seq}++;
580 0           $seq_num = $node->{doc_out_seq};
581             }
582             elsif(not $this->{is_meta_message})
583             {
584             # only allow $part_num to be specified if this is a RE_SEND request
585 0 0         if($part_num)
586             {
587 0 0         $node_tbl->unlock() unless $was_locked; # unlock node_tbl if needed
588 0           return "send_this_message(): part_num specified ($part_num), for " .
589             "non- RE_SEND request!";
590             }
591             # automatically get next (msg) sequence number
592 0           $node->{out_seq}++;
593 0           $seq_num = $node->{out_seq};
594             }
595              
596             # compute message part size
597 0           my $msg_part_size = $cfg->MSG_PART_SIZE_DFLT;
598 0 0 0       if(defined $node->{msg_part_size} and $node->{msg_part_size} > 0)
599             {
600 0           $msg_part_size = $node->{msg_part_size};
601             }
602              
603             # compute data size
604 0           my $file_size = (stat $this->{file_handle})[7];
605 0           my $data_size = $file_size - $this->{data_offset};
606 0 0         if($data_size <= 0)
607             {
608 0 0         $node_tbl->unlock() unless $was_locked; # unlock node_tbl if needed
609 0           return "send_this_message(): data_size is <= 0 ($data_size)!";
610             }
611              
612             # for document, force num_parts = 1
613 0 0         if($this->{is_document})
614             {
615 0           $msg_part_size = $data_size;
616             }
617              
618             # compute num_parts
619 0           my $num_parts = int($data_size / $msg_part_size);
620 0 0         $num_parts++ if ($data_size % $msg_part_size) > 0;
621             # num_parts should be 1 for meta message
622 0 0 0       if($this->{is_meta_message} and $num_parts > 1)
623             {
624 0 0         $node_tbl->unlock() unless $was_locked; # unlock node_tbl if needed
625 0           return "send_this_message(): num_parts cannot be > 1 for meta message!";
626             }
627             # $part_num cannot be greater than $num_parts
628 0 0 0       if(defined $part_num and $part_num and $part_num > $num_parts)
      0        
629             {
630 0 0         $node_tbl->unlock() unless $was_locked; # unlock node_tbl if needed
631 0           return "send_this_message(): part_num ($part_num) cannot be greater " .
632             "than num_parts ($num_parts)!";
633             }
634              
635             # compute base subject
636 0           my $subject = $cfg->MAIL_MRK . ':' . $cfg->THIS_NODE;
637 0 0         $subject .= ":$seq_num" if $seq_num;
638 0 0         $subject .= ":DOC" if $this->{is_document};
639              
640 0 0         if($is_re_send)
641             {
642             # to save disk space, don't copy message to file for RE_SEND
643 0 0         log_info("send_this_message(): transmitting $rcv_node_id RE_SEND " .
644             "message $seq_num" . ($part_num ? ":$part_num" : '') . "\n");
645             }
646             else
647             {
648             # copy message to file (for non- RE_SEND)
649              
650 0           my $filename;
651              
652 0 0         if($this->{is_meta_message})
653             {
654             # copy meta message to mboxes/out subdirectory
655 0           $filename = sprintf("%s_%s_%s.msg",
656             $cfg->THIS_NODE, $rcv_node_id, "META");
657 0           my $dirname = $cfg->ECS_MBX_OUT_DIR;
658             # create directory if it doesn't already exist
659 0 0         mkdir $dirname unless -e $dirname;
660 0           $filename = catfile($dirname, $filename);
661             }
662             else
663             {
664             # copy regular message or document file to mboxes/out_NODE subdirectory
665 0 0         if($this->{is_document})
666             {
667 0           $filename = format_doc_filename($rcv_node_id, $seq_num);
668             }
669             else
670             {
671 0           $filename = format_msg_filename($rcv_node_id, $seq_num);
672             }
673             # create directory if it doesn't already exist
674 0           my $dirname = dirname($filename);
675 0 0         mkdir $dirname unless -e $dirname;
676             }
677              
678             # don't overwrite $filename file if it already exists
679 0           my $fh;
680 0 0         if(-e $filename)
681             {
682 0           my $template = $filename . "_XXXXXX";
683 0           ($fh, $filename) = tempfile($template);
684 0 0         return "send_this_message(): unable to open _XXXX file: " .
685             "$filename"
686             unless $fh;
687             }
688             else
689             {
690 0           $fh = new IO::File;
691 0 0         return "send_this_message(): unable to open file: " .
692             "$filename"
693             unless $fh->open("> $filename");
694             }
695              
696 0           print $fh "Subject: $subject\n";
697 0           print $fh "To: $node->{addr}\n";
698 0           print $fh "From: " . $cfg->SMTP_FROM . "\n\n";
699             # copy data to $fh
700             $err = "Unable to position file pointer for file $this->{filename}" .
701             " to position $this->{data_offset}: $!"
702 0 0         unless seek $this->{file_handle}, $this->{data_offset}, 0;
703 0           my $buffer;
704 0           while(1)
705             {
706 0 0         if($err)
707             {
708 0 0         $node_tbl->unlock() unless $was_locked; # unlock if needed
709 0           close $fh;
710 0           unlink $filename;
711 0           return $err;
712             }
713              
714 0           my $bytecount = read $this->{file_handle}, $buffer, 65536;
715 0 0         if(not defined $bytecount)
    0          
716             {
717 0           $err = "send_this_message(): Problem reading input file " .
718             "$this->{filename}: $!";
719             }
720             elsif($bytecount == 0)
721             {
722 0           last; # EOF
723             }
724             else
725             {
726 0 0         print $fh $buffer
727             or $err = "send_this_message(): Problem writing output " .
728             "file $filename: $!";
729             }
730             }
731 0           close $fh;
732 0           chmod $FILEMODE, $filename;
733             }
734              
735 0           my $custom_headers = {};
736 0           $custom_headers->{'x-emdis-hub-rcv'} = $rcv_node_id;
737 0           $custom_headers->{'x-emdis-hub-snd'} = $cfg->THIS_NODE;
738              
739 0 0         if($num_parts == 1)
740             {
741             # read all data, send single email message
742             $err = "Unable to position file pointer for file $this->{filename}" .
743             " to position $this->{data_offset}: $!"
744 0 0         unless seek $this->{file_handle}, $this->{data_offset}, 0;
745              
746 0 0         if(not $err)
747             {
748 0           my $all_data;
749 0           my $bytecount = read $this->{file_handle}, $all_data, $data_size;
750              
751 0 0 0       if(not defined $bytecount)
    0          
    0          
752             {
753 0           $err = "send_this_message(): Problem reading input file " .
754             "$this->{filename}: $!";
755             }
756             elsif($bytecount != $data_size)
757             {
758 0           $err = "send_this_message(): Problem reading from input file " .
759             "$this->{filename}: expected $msg_part_size bytes, " .
760             "found $bytecount bytes.";
761             }
762             elsif($this->{is_meta_message}
763             and ($node->{encr_meta} !~ /true/io))
764             {
765             # don't encrypt meta-message
766 0 0 0       if(is_yes($cfg->ENABLE_AMQP) and exists $node->{amqp_addr_meta} and $node->{amqp_addr_meta}) {
      0        
767             # send meta-message via AMQP (if indicated by node config)
768             return send_amqp_message(
769             $node->{amqp_addr_meta},
770 0           $subject,
771             $node,
772             $custom_headers,
773             $all_data);
774             }
775             else {
776 0           $err = send_email($node->{addr}, $subject, undef, $all_data);
777             }
778             }
779             else
780             {
781             # send encrypted message
782             $err = send_encrypted_message(
783             $node->{encr_typ},
784             $node->{addr_r},
785             $node->{addr},
786             $node->{encr_out_keyid},
787             $node->{encr_out_passphrase},
788 0           $node,
789             $subject,
790             $custom_headers,
791             $all_data);
792             }
793             }
794             }
795             else
796             {
797             # process message parts ...
798              
799 0           my $min_part_num = 1;
800 0           my $max_part_num = $num_parts;
801 0 0         if($part_num)
802             {
803             # if $part_num specified, limit to that $part_num
804 0           $min_part_num = $part_num;
805 0           $max_part_num = $part_num;
806             }
807              
808             # iterate through message part(s), send email message(s)
809 0           my $parts_sent = 0;
810 0           for($part_num = $min_part_num; $part_num <= $max_part_num; $part_num++)
811             {
812             my $part_offset = $this->{data_offset} +
813 0           ($part_num -1) * $msg_part_size;
814             $err = "Unable to position file pointer for file " .
815             "$this->{filename} to position $this->{data_offset}: $!"
816 0 0         unless seek $this->{file_handle}, $part_offset, 0;
817              
818 0 0         if(not $err)
819             {
820 0           my $part_data;
821 0           my $bytecount = read $this->{file_handle}, $part_data,
822             $msg_part_size;
823              
824 0 0 0       if(not defined $bytecount)
    0          
    0          
825             {
826 0           $err = "send_this_message(): Problem reading input file " .
827             "$this->{filename}: $!";
828             }
829             elsif($part_num < $num_parts and $bytecount != $msg_part_size)
830             {
831 0           $err = "send_this_message(): Problem reading $rcv_node_id " .
832             "message part $part_num/$num_parts from input file " .
833             "$this->{filename}: expected $msg_part_size bytes, " .
834             "found $bytecount bytes.";
835             }
836             elsif($bytecount <= 0)
837             {
838 0           $err = "send_this_message(): Problem reading $rcv_node_id " .
839             "message part $part_num/$num_parts from input file " .
840             "$this->{filename}: found $bytecount bytes.";
841             }
842             else
843             {
844             # send encrypted email message
845             $err = send_encrypted_message(
846             $node->{encr_typ},
847             $node->{addr_r},
848             $node->{addr},
849             $node->{encr_out_keyid},
850             $node->{encr_out_passphrase},
851 0           $node,
852             "$subject:$part_num/$num_parts",
853             $custom_headers,
854             $part_data);
855             }
856             }
857              
858 0 0         if($err)
859             {
860 0 0         if($parts_sent == 0)
861             {
862             # nothing sent yet, so quit now (possible smtp problem?)
863 0           last;
864             }
865             else
866             {
867             # part of message was sent, so log error and continue
868 0           log_error($err);
869 0           $err = '';
870             }
871             }
872             else
873             {
874 0           $parts_sent++;
875             }
876             }
877             }
878              
879 0 0         if(not $err)
880             {
881             # update node last_out, possibly out_seq
882 0           $node->{last_out} = time();
883 0 0         $err = $node_tbl->ERROR
884             unless $node_tbl->write($rcv_node_id, $node);
885             }
886             $node_tbl->unlock() # unlock node_tbl if needed
887 0 0         unless $was_locked;
888              
889 0           return $err;
890             }
891              
892             1;
893              
894             __DATA__