File Coverage

blib/lib/Hypersonic/Event/IOCP.pm
Criterion Covered Total %
statement 12 44 27.2
branch 0 2 0.0
condition n/a
subroutine 5 22 22.7
pod 13 18 72.2
total 30 86 34.8


line stmt bran cond sub pod time code
1             package Hypersonic::Event::IOCP;
2              
3 2     2   196395 use strict;
  2         3  
  2         56  
4 2     2   6 use warnings;
  2         2  
  2         73  
5 2     2   23 use 5.010;
  2         9  
6              
7 2     2   6 use parent 'Hypersonic::Event::Role';
  2         7  
  2         10  
8              
9             our $VERSION = '0.17';
10              
11 0     0 1 0 sub name { 'iocp' }
12              
13             sub available {
14 3     3 1 26 return $^O eq 'MSWin32';
15             }
16              
17             sub includes {
18 0     0 1   return <<'C';
19             #include
20             #include
21             #include
22             #pragma comment(lib, "ws2_32.lib")
23             #pragma comment(lib, "mswsock.lib")
24             C
25             }
26              
27             sub defines {
28 0     0 1   return <<'C';
29             #define EV_BACKEND_IOCP 1
30             #ifndef MAX_EVENTS
31             #define MAX_EVENTS 1024
32             #endif
33              
34             /* Operation types for OVERLAPPED tracking */
35             #define OP_ACCEPT 1
36             #define OP_READ 2
37             #define OP_WRITE 3
38              
39             /* Per-I/O data structure */
40             typedef struct {
41             OVERLAPPED overlapped;
42             WSABUF wsa_buf;
43             char buffer[65536];
44             int op_type;
45             SOCKET socket;
46             } PER_IO_DATA;
47              
48             /* AcceptEx function pointer (loaded dynamically) */
49             static LPFN_ACCEPTEX lpfnAcceptEx = NULL;
50              
51             /* Per-IO data for Hypersonic::UA::Async slot tracking.
52             * One ASYNC_SLOT_IO is allocated per pending readiness notification
53             * and freed in gen_get_slot once the completion is delivered.
54             * Slot is also passed as the IOCP completion key (ULONG_PTR), so
55             * gen_get_slot recovers it cheaply via lpCompletionKey, falling
56             * back to overlapped->slot when the kernel delivers a NULL key. */
57             typedef struct {
58             OVERLAPPED overlapped;
59             int slot;
60             char peek_buf[1];
61             } ASYNC_SLOT_IO;
62             C
63             }
64              
65 0     0 1   sub event_struct { 'OVERLAPPED_ENTRY' }
66              
67 0     0 1   sub extra_cflags { '' }
68 0     0 1   sub extra_ldflags { '-lws2_32 -lmswsock' }
69              
70             # Create the IOCP completion port without an associated listen socket.
71             # Used by Hypersonic::UA::Async tick path (no server listener required).
72             sub gen_create_loop {
73 0     0 0   my ($class, $builder, $loop_var) = @_;
74              
75 0           $builder->comment('IOCP loop create (no listen socket - UA::Async tick path)')
76             ->line('WSADATA wsa_data;')
77             ->if('WSAStartup(MAKEWORD(2,2), &wsa_data) != 0')
78             ->line('croak("WSAStartup failed");')
79             ->endif
80             ->line('HANDLE iocp_loop = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);')
81             ->if('iocp_loop == NULL')
82             ->line('WSACleanup();')
83             ->line('croak("CreateIoCompletionPort (loop) failed");')
84             ->endif
85             ->line("$loop_var = (int)(intptr_t)iocp_loop;");
86             }
87              
88             # ============================================================
89             # UA::Async slot integration (stub)
90             # ------------------------------------------------------------
91             # IOCP is completion-based, not readiness-based, so we fake
92             # readiness by posting a 1-byte WSARecv with MSG_PEEK (read) or
93             # synthesizing an immediate completion via PostQueuedCompletionStatus
94             # (write). The slot is carried both as the IOCP completion key
95             # (ULONG_PTR) and inside the per-IO ASYNC_SLOT_IO struct, so
96             # gen_get_slot can recover it from either source.
97             # This is enough to drive Hypersonic::UA::Async's tick loop on
98             # native Win32; high-throughput async users should still prefer
99             # the dedicated IOCP path in the future-server JIT.
100             # ============================================================
101              
102             sub gen_add_with_slot {
103 0     0 0   my ($class, $builder, $loop_var, $fd_var, $slot_var, $events) = @_;
104              
105 0           $builder->line('{')
106             ->line(" HANDLE _iocp = (HANDLE)(intptr_t)$loop_var;")
107             ->comment(' Associate fd with IOCP (slot as completion key); idempotent / errors ignored.')
108             ->line(" CreateIoCompletionPort((HANDLE)(intptr_t)$fd_var, _iocp, (ULONG_PTR)$slot_var, 0);")
109             ->line(' ASYNC_SLOT_IO *_io = (ASYNC_SLOT_IO *)calloc(1, sizeof(ASYNC_SLOT_IO));')
110             ->if('!_io')
111             ->line('croak("calloc(ASYNC_SLOT_IO) failed");')
112             ->endif
113             ->line(" _io->slot = $slot_var;");
114              
115 0 0         if ($events eq 'write') {
116 0           $builder->comment(' Writability: synthesize an immediate completion.')
117             ->line(" PostQueuedCompletionStatus(_iocp, 0, (ULONG_PTR)$slot_var, &_io->overlapped);");
118             }
119             else {
120 0           $builder->comment(' Readability: post a 1-byte WSARecv with MSG_PEEK so we get notified')
121             ->comment(' when data arrives without consuming it from the socket buffer.')
122             ->line(' WSABUF _buf;')
123             ->line(' _buf.buf = _io->peek_buf;')
124             ->line(' _buf.len = 1;')
125             ->line(' DWORD _flags = MSG_PEEK;')
126             ->line(' DWORD _bytes = 0;')
127             ->line(" int _rc = WSARecv((SOCKET)$fd_var, &_buf, 1, &_bytes, &_flags, &_io->overlapped, NULL);")
128             ->line(' if (_rc == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) {')
129             ->comment(' Failed (e.g. socket closed): synthesize completion so caller can advance state.')
130             ->line(" PostQueuedCompletionStatus(_iocp, 0, (ULONG_PTR)$slot_var, &_io->overlapped);")
131             ->line(' }');
132             }
133 0           $builder->line('}');
134             }
135              
136             sub gen_wait_once {
137 0     0 0   my ($class, $builder, $loop_var, $events_var, $count_var, $timeout_ms) = @_;
138              
139 0           $builder->line('{')
140             ->line(" HANDLE _iocp = (HANDLE)(intptr_t)$loop_var;")
141             ->line(' ULONG _removed = 0;')
142             ->line(" BOOL _ok = GetQueuedCompletionStatusEx(_iocp, $events_var, MAX_EVENTS, &_removed, (DWORD)($timeout_ms), FALSE);")
143             ->line(" $count_var = _ok ? (int)_removed : 0;")
144             ->line('}');
145             }
146              
147             sub gen_get_slot {
148 0     0 0   my ($class, $builder, $events_var, $index_var, $slot_var) = @_;
149              
150 0           $builder->line("int $slot_var = -1;")
151             ->line('{')
152             ->line(" OVERLAPPED_ENTRY *_entry = &${events_var}" . "[$index_var];")
153             ->line(" $slot_var = (int)(intptr_t)_entry->lpCompletionKey;")
154             ->line(' if (_entry->lpOverlapped) {')
155             ->line(' ASYNC_SLOT_IO *_io = CONTAINING_RECORD(_entry->lpOverlapped, ASYNC_SLOT_IO, overlapped);')
156             ->line(" if ($slot_var < 0) $slot_var = _io->slot;")
157             ->line(' free(_io);')
158             ->line(' }')
159             ->line('}');
160             }
161              
162             # IOCP is completion-based, fundamentally different from readiness-based APIs
163             sub gen_create {
164 0     0 1   my ($class, $builder, $listen_fd_var) = @_;
165              
166 0           $builder->comment('IOCP backend - Windows I/O Completion Ports')
167             ->comment('High-performance completion-based I/O')
168             ->blank
169             ->comment('Initialize Winsock')
170             ->line('WSADATA wsa_data;')
171             ->if('WSAStartup(MAKEWORD(2,2), &wsa_data) != 0')
172             ->line('croak("WSAStartup failed");')
173             ->endif
174             ->blank
175             ->comment('Create I/O Completion Port')
176             ->line('HANDLE iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);')
177             ->if('iocp == NULL')
178             ->line('WSACleanup();')
179             ->line('croak("CreateIoCompletionPort failed");')
180             ->endif
181             ->blank
182             ->comment('Associate listen socket with IOCP')
183             ->line("if (CreateIoCompletionPort((HANDLE)$listen_fd_var, iocp, (ULONG_PTR)$listen_fd_var, 0) == NULL) {")
184             ->line(' CloseHandle(iocp);')
185             ->line(' WSACleanup();')
186             ->line(' croak("Failed to associate listen socket with IOCP");')
187             ->line('}')
188             ->blank
189             ->comment('Load AcceptEx function')
190             ->line('GUID guid_acceptex = WSAID_ACCEPTEX;')
191             ->line('DWORD bytes;')
192             ->line("if (WSAIoctl($listen_fd_var, SIO_GET_EXTENSION_FUNCTION_POINTER,")
193             ->line(' &guid_acceptex, sizeof(guid_acceptex),')
194             ->line(' &lpfnAcceptEx, sizeof(lpfnAcceptEx),')
195             ->line(' &bytes, NULL, NULL) == SOCKET_ERROR) {')
196             ->line(' CloseHandle(iocp);')
197             ->line(' WSACleanup();')
198             ->line(' croak("Failed to load AcceptEx");')
199             ->line('}')
200             ->blank
201             ->comment('Post initial AcceptEx')
202             ->line('PER_IO_DATA* accept_data = (PER_IO_DATA*)calloc(1, sizeof(PER_IO_DATA));')
203             ->line('accept_data->op_type = OP_ACCEPT;')
204             ->line('accept_data->socket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);')
205             ->line("lpfnAcceptEx($listen_fd_var, accept_data->socket, accept_data->buffer,")
206             ->line(' 0, sizeof(struct sockaddr_in) + 16, sizeof(struct sockaddr_in) + 16,')
207             ->line(' &bytes, &accept_data->overlapped);')
208             ->blank
209             ->line('int ev_fd = (int)(intptr_t)iocp;') # Store IOCP handle as ev_fd
210             ->blank;
211             }
212              
213             # Associate socket with IOCP and post read operation
214             sub gen_add {
215 0     0 1   my ($class, $builder, $loop_var, $fd_var) = @_;
216              
217 0           $builder->line("HANDLE iocp_handle = (HANDLE)(intptr_t)$loop_var;")
218             ->line("CreateIoCompletionPort((HANDLE)$fd_var, iocp_handle, (ULONG_PTR)$fd_var, 0);")
219             ->blank
220             ->comment('Post initial read operation')
221             ->line('PER_IO_DATA* io_data = (PER_IO_DATA*)calloc(1, sizeof(PER_IO_DATA));')
222             ->line('io_data->op_type = OP_READ;')
223             ->line("io_data->socket = $fd_var;")
224             ->line('io_data->wsa_buf.buf = io_data->buffer;')
225             ->line('io_data->wsa_buf.len = sizeof(io_data->buffer);')
226             ->line('DWORD flags = 0;')
227             ->line("WSARecv($fd_var, &io_data->wsa_buf, 1, NULL, &flags, &io_data->overlapped, NULL);");
228             }
229              
230             # Cancel pending I/O and close socket
231             sub gen_del {
232 0     0 1   my ($class, $builder, $loop_var, $fd_var) = @_;
233              
234 0           $builder->line("CancelIo((HANDLE)$fd_var);")
235             ->line("closesocket($fd_var);");
236             }
237              
238             # Wait for completions using GetQueuedCompletionStatusEx
239             sub gen_wait {
240 0     0 1   my ($class, $builder, $loop_var, $events_var, $count_var, $timeout_var) = @_;
241              
242 0           $builder->line("HANDLE iocp_handle = (HANDLE)(intptr_t)$loop_var;")
243             ->line("static OVERLAPPED_ENTRY $events_var" . "[MAX_EVENTS];")
244             ->line("ULONG $count_var = 0;")
245             ->blank
246             ->line("BOOL ok = GetQueuedCompletionStatusEx(iocp_handle, $events_var, MAX_EVENTS, &$count_var, $timeout_var, FALSE);")
247             ->if('!ok')
248             ->if('GetLastError() == WAIT_TIMEOUT')
249             ->line('continue;')
250             ->endif
251             ->line('perror("GetQueuedCompletionStatusEx");')
252             ->line('break;')
253             ->endif;
254             }
255              
256             # Extract socket and operation type from completion
257             sub gen_get_fd {
258 0     0 1   my ($class, $builder, $events_var, $index_var, $fd_var) = @_;
259              
260 0           $builder->line("OVERLAPPED_ENTRY* entry = &${events_var}[$index_var];")
261             ->line('PER_IO_DATA* io_data = CONTAINING_RECORD(entry->lpOverlapped, PER_IO_DATA, overlapped);')
262             ->line("int $fd_var = (int)io_data->socket;")
263             ->line('int op_type = io_data->op_type;')
264             ->line('DWORD bytes_transferred = entry->dwNumberOfBytesTransferred;')
265             ->blank
266             ->if('op_type == OP_ACCEPT')
267             ->comment('Accept completed - io_data->socket is the new client')
268             ->line("$fd_var = listen_fd;") # Signal this was accept
269             ->line('int client_fd = (int)io_data->socket;')
270             ->elsif('op_type == OP_READ')
271             ->if('bytes_transferred == 0')
272             ->comment('Connection closed')
273             ->line("closesocket($fd_var);")
274             ->line('free(io_data);')
275             ->line('g_active_connections--;')
276             ->line('continue;')
277             ->endif
278             ->comment('Data received - already in io_data->buffer')
279             ->endif;
280             }
281              
282             # Cleanup IOCP resources
283             sub gen_cleanup {
284 0     0 1   my ($class, $builder) = @_;
285              
286 0           $builder->line('CloseHandle(iocp);')
287             ->line('WSACleanup();');
288             }
289              
290             # Future/Pool integration - IOCP uses different mechanism for pool notify
291             # On Windows, pool uses a pipe which can be added to IOCP via overlapped read
292             sub gen_add_pool_notify {
293 0     0 0   my ($class, $builder, $loop_var, $notify_fd_var) = @_;
294              
295 0           $builder->line("/* Add pool notify fd to IOCP */")
296             ->line("HANDLE iocp_handle = (HANDLE)(intptr_t)$loop_var;")
297             ->line("CreateIoCompletionPort((HANDLE)$notify_fd_var, iocp_handle, (ULONG_PTR)$notify_fd_var, 0);")
298             ->line('PER_IO_DATA* pool_io = (PER_IO_DATA*)calloc(1, sizeof(PER_IO_DATA));')
299             ->line('pool_io->op_type = 4;') # New type for pool notify
300             ->line("pool_io->socket = $notify_fd_var;")
301             ->line('pool_io->wsa_buf.buf = pool_io->buffer;')
302             ->line('pool_io->wsa_buf.len = 1;')
303             ->line('DWORD flags = 0;')
304             ->line("ReadFile((HANDLE)$notify_fd_var, pool_io->buffer, 1, NULL, &pool_io->overlapped);");
305             }
306              
307             1;
308              
309             __END__