5
|
|
|
|
|
|
|
#include "EXTERN.h"
#include "perl.h"
#include "XSUB.h"
#define ECB_NO_LIBM 1
#define ECB_NO_THREADS 1
#include "ecb.h"
#include "schmorp.h"
typedef volatile sig_atomic_t atomic_t;
static int *sig_pending, *psig_pend; /* make local copies because of missing THX */
static Sighandler_t old_sighandler;
static atomic_t async_pending;
#define PERL_VERSION_ATLEAST(a,b,c) \
(PERL_REVISION > (a) \
|| (PERL_REVISION == (a) \
&& (PERL_VERSION > (b) \
|| (PERL_VERSION == (b) && PERL_SUBVERSION >= (c)))))
#if defined(HAS_SIGACTION) && defined(SA_SIGINFO)
# define HAS_SA_SIGINFO 1
#endif
#if !PERL_VERSION_ATLEAST(5,10,0)
# undef HAS_SA_SIGINFO
#endif
/*****************************************************************************/
typedef struct {
SV *cb;
void (*c_cb)(pTHX_ void *c_arg, int value);
void *c_arg;
SV *fh_r, *fh_w;
SV *value;
int signum;
int autodrain;
ANY *scope_savestack;
volatile int blocked;
s_epipe ep;
int fd_wlen;
atomic_t fd_enable;
atomic_t pending;
volatile IV *valuep;
atomic_t hysteresis;
} async_t;
static AV *asyncs;
static async_t *sig_async [SIG_SIZE];
#define SvASYNC_nrv(sv) INT2PTR (async_t *, SvIVX (sv))
#define SvASYNC(rv) SvASYNC_nrv (SvRV (rv))
static void async_signal (void *signal_arg, int value);
static void
setsig (int signum, void (*handler)(int))
{
#if _WIN32
signal (signum, handler);
#else
struct sigaction sa;
sa.sa_handler = handler;
sigfillset (&sa.sa_mask);
sa.sa_flags = 0; /* if we interrupt a syscall, we might drain the pipe before it became ready */
sigaction (signum, &sa, 0);
#endif
}
static void
async_sigsend (int signum)
{
async_signal (sig_async [signum], 0);
}
/* the main workhorse to signal */
static void
async_signal (void *signal_arg, int value)
{
static char pipedata [8];
async_t *async = (async_t *)signal_arg;
int pending = async->pending;
if (async->hysteresis)
setsig (async->signum, SIG_IGN);
*async->valuep = value ? value : 1;
ECB_MEMORY_FENCE_RELEASE;
async->pending = 1;
ECB_MEMORY_FENCE_RELEASE;
async_pending = 1;
ECB_MEMORY_FENCE_RELEASE;
if (!async->blocked)
{
psig_pend [9] = 1;
ECB_MEMORY_FENCE_RELEASE;
*sig_pending = 1;
ECB_MEMORY_FENCE_RELEASE;
}
if (!pending && async->fd_enable && async->ep.len)
s_epipe_signal (&async->ep);
}
static void
handle_async (async_t *async)
{
int old_errno = errno;
int value = *async->valuep;
*async->valuep = 0;
async->pending = 0;
/* restore signal */
if (async->hysteresis)
setsig (async->signum, async_sigsend);
/* drain pipe */
if (async->fd_enable && async->ep.len && async->autodrain)
s_epipe_drain (&async->ep);
if (async->c_cb)
{
dTHX;
async->c_cb (aTHX_ async->c_arg, value);
}
if (async->cb)
{
dSP;
SV *saveerr = SvOK (ERRSV) ? sv_mortalcopy (ERRSV) : 0;
SV *savedie = PL_diehook;
PL_diehook = 0;
PUSHSTACKi (PERLSI_SIGNAL);
PUSHMARK (SP);
XPUSHs (sv_2mortal (newSViv (value)));
PUTBACK;
call_sv (async->cb, G_VOID | G_DISCARD | G_EVAL);
if (SvTRUE (ERRSV))
{
SPAGAIN;
PUSHMARK (SP);
PUTBACK;
call_sv (get_sv ("Async::Interrupt::DIED", 1), G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
sv_setpvn (ERRSV, "", 0);
}
if (saveerr)
sv_setsv (ERRSV, saveerr);
{
SV *oldhook = PL_diehook;
PL_diehook = savedie;
SvREFCNT_dec (oldhook);
}
POPSTACK;
}
errno = old_errno;
}
static void
handle_asyncs (void)
{
int i;
ECB_MEMORY_FENCE_ACQUIRE;
async_pending = 0;
for (i = AvFILLp (asyncs); i >= 0; --i)
{
SV *async_sv = AvARRAY (asyncs)[i];
async_t *async = SvASYNC_nrv (async_sv);
if (async->pending && !async->blocked)
{
/* temporarily keep a refcount */
SvREFCNT_inc (async_sv);
handle_async (async);
SvREFCNT_dec (async_sv);
/* the handler could have deleted any number of asyncs */
if (i > AvFILLp (asyncs))
i = AvFILLp (asyncs);
}
}
}
#if HAS_SA_SIGINFO
static Signal_t async_sighandler (int signum, siginfo_t *si, void *sarg)
{
if (signum == 9)
handle_asyncs ();
else
old_sighandler (signum, si, sarg);
}
#else
static Signal_t async_sighandler (int signum)
{
if (signum == 9)
handle_asyncs ();
else
old_sighandler (signum);
}
#endif
#define block(async) ++(async)->blocked
static void
unblock (async_t *async)
{
--async->blocked;
if (async->pending && !async->blocked)
handle_async (async);
}
static void
scope_block_cb (pTHX_ void *async_sv)
{
async_t *async = SvASYNC_nrv ((SV *)async_sv);
async->scope_savestack = 0;
unblock (async);
SvREFCNT_dec (async_sv);
}
static void
scope_block (SV *async_sv)
{
async_t *async = SvASYNC_nrv (async_sv);
/* as a heuristic, we skip the scope block if we already are blocked */
/* and the existing scope block used the same savestack */
if (!async->scope_savestack || async->scope_savestack != PL_savestack)
{
async->scope_savestack = PL_savestack;
block (async);
LEAVE; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
SAVEDESTRUCTOR_X (scope_block_cb, (void *)SvREFCNT_inc (async_sv));
ENTER; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
}
}
MODULE = Async::Interrupt PACKAGE = Async::Interrupt
BOOT:
old_sighandler = PL_sighandlerp;
PL_sighandlerp = async_sighandler;
sig_pending = &PL_sig_pending;
psig_pend = PL_psig_pend;
asyncs = newAV ();
CvNODEBUG_on (get_cv ("Async::Interrupt::scope_block", 0)); /* otherwise calling scope can be the debugger */
PROTOTYPES: DISABLE
void
_alloc (SV *cb, void *c_cb, void *c_arg, SV *fh_r, SV *fh_w, SV *signl, SV *pvalue)
PPCODE:
{
SV *cv = SvOK (cb) ? SvREFCNT_inc (s_get_cv_croak (cb)) : 0;
async_t *async;
Newz (0, async, 1, async_t);
XPUSHs (sv_2mortal (newSViv (PTR2IV (async))));
/* TODO: need to bless right now to ensure deallocation */
av_push (asyncs, TOPs);
SvGETMAGIC (fh_r); SvGETMAGIC (fh_w);
if (SvOK (fh_r) || SvOK (fh_w))
{
int fd_r = s_fileno_croak (fh_r, 0);
int fd_w = s_fileno_croak (fh_w, 1);
async->fh_r = newSVsv (fh_r);
async->fh_w = newSVsv (fh_w);
async->ep.fd [0] = fd_r;
async->ep.fd [1] = fd_w;
async->ep.len = 1;
async->fd_enable = 1;
}
async->value = SvROK (pvalue)
? SvREFCNT_inc_NN (SvRV (pvalue))
: NEWSV (0, 0);
sv_setiv (async->value, 0);
SvIOK_only (async->value); /* just to be sure */
SvREADONLY_on (async->value);
async->valuep = &(SvIVX (async->value));
async->autodrain = 1;
async->cb = cv;
async->c_cb = c_cb;
async->c_arg = c_arg;
async->signum = SvOK (signl) ? s_signum_croak (signl) : 0;
if (async->signum)
{
if (async->signum < 0)
croak ("Async::Interrupt::new got passed illegal signal name or number: %s", SvPV_nolen (signl));
sig_async [async->signum] = async;
setsig (async->signum, async_sigsend);
}
}
void
signal_hysteresis (async_t *async, int enable)
CODE:
async->hysteresis = enable;
void
signal_func (async_t *async)
PPCODE:
EXTEND (SP, 2);
PUSHs (sv_2mortal (newSViv (PTR2IV (async_signal))));
PUSHs (sv_2mortal (newSViv (PTR2IV (async))));
void
scope_block_func (SV *self)
PPCODE:
EXTEND (SP, 2);
PUSHs (sv_2mortal (newSViv (PTR2IV (scope_block))));
PUSHs (sv_2mortal (newSViv (PTR2IV (SvRV (self)))));
IV
c_var (async_t *async)
CODE:
RETVAL = PTR2IV (async->valuep);
OUTPUT:
RETVAL
void
handle (async_t *async)
CODE:
handle_async (async);
void
signal (async_t *async, int value = 1)
CODE:
async_signal (async, value);
void
block (async_t *async)
CODE:
block (async);
void
unblock (async_t *async)
CODE:
unblock (async);
void
scope_block (SV *self)
CODE:
scope_block (SvRV (self));
void
pipe_enable (async_t *async)
ALIAS:
pipe_enable = 1
pipe_disable = 0
CODE:
async->fd_enable = ix;
int
pipe_fileno (async_t *async)
CODE:
if (!async->ep.len)
{
int res;
/*block (async);*//*TODO*/
res = s_epipe_new (&async->ep);
async->fd_enable = 1;
/*unblock (async);*//*TODO*/
if (res < 0)
croak ("Async::Interrupt: unable to initialize event pipe");
}
RETVAL = async->ep.fd [0];
OUTPUT:
RETVAL
int
pipe_autodrain (async_t *async, int enable = -1)
CODE:
RETVAL = async->autodrain;
if (enable >= 0)
async->autodrain = enable;
OUTPUT:
RETVAL
void
pipe_drain (async_t *async)
CODE:
if (async->ep.len)
s_epipe_drain (&async->ep);
void
post_fork (async_t *async)
CODE:
if (async->ep.len)
{
int res;
/*block (async);*//*TODO*/
res = s_epipe_renew (&async->ep);
/*unblock (async);*//*TODO*/
if (res < 0)
croak ("Async::Interrupt: unable to initialize event pipe after fork");
}
void
DESTROY (SV *self)
CODE:
{
int i;
SV *async_sv = SvRV (self);
async_t *async = SvASYNC_nrv (async_sv);
for (i = AvFILLp (asyncs); i >= 0; --i)
if (AvARRAY (asyncs)[i] == async_sv)
{
AvARRAY (asyncs)[i] = AvARRAY (asyncs)[AvFILLp (asyncs)];
av_pop (asyncs);
goto found;
}
if (!PL_dirty)
warn ("Async::Interrupt::DESTROY could not find async object in list of asyncs, please report");
found:
if (async->signum)
setsig (async->signum, SIG_DFL);
if (!async->fh_r && async->ep.len)
s_epipe_destroy (&async->ep);
SvREFCNT_dec (async->fh_r);
SvREFCNT_dec (async->fh_w);
SvREFCNT_dec (async->cb);
SvREFCNT_dec (async->value);
Safefree (async);
}
SV *
sig2num (SV *signame_or_number)
ALIAS:
sig2num = 0
sig2name = 1
PROTOTYPE: $
CODE:
{
int signum = s_signum (signame_or_number);
if (signum < 0)
RETVAL = &PL_sv_undef;
else if (ix)
RETVAL = newSVpv (PL_sig_name [signum], 0);
else
RETVAL = newSViv (signum);
}
OUTPUT:
RETVAL
MODULE = Async::Interrupt PACKAGE = Async::Interrupt::EventPipe PREFIX = s_epipe_
void
new (const char *klass)
PPCODE:
{
s_epipe *epp;
Newz (0, epp, 1, s_epipe);
XPUSHs (sv_setref_iv (sv_newmortal (), klass, PTR2IV (epp)));
if (s_epipe_new (epp) < 0)
croak ("Async::Interrupt::EventPipe: unable to create new event pipe");
}
void
filenos (s_epipe *epp)
PPCODE:
EXTEND (SP, 2);
PUSHs (sv_2mortal (newSViv (epp->fd [0])));
PUSHs (sv_2mortal (newSViv (epp->fd [1])));
int
fileno (s_epipe *epp)
ALIAS:
fileno = 0
fileno_r = 0
fileno_w = 1
CODE:
RETVAL = epp->fd [ix];
OUTPUT:
RETVAL
int
type (s_epipe *epp)
CODE:
RETVAL = epp->len;
OUTPUT:
RETVAL
void
s_epipe_signal (s_epipe *epp)
void
s_epipe_drain (s_epipe *epp)
void
signal_func (s_epipe *epp)
ALIAS:
drain_func = 1
PPCODE:
EXTEND (SP, 2);
PUSHs (sv_2mortal (newSViv (PTR2IV (ix ? s_epipe_drain : s_epipe_signal))));
PUSHs (sv_2mortal (newSViv (PTR2IV (epp))));
void
s_epipe_wait (s_epipe *epp)
void
s_epipe_renew (s_epipe *epp)
void
DESTROY (s_epipe *epp)
CODE:
s_epipe_destroy (epp);
|