File Coverage

blib/lib/Hypersonic/Event/IOCP.pm
Criterion Covered Total %
statement 12 32 37.5
branch n/a
condition n/a
subroutine 5 18 27.7
pod 13 14 92.8
total 30 64 46.8


line stmt bran cond sub pod time code
1             package Hypersonic::Event::IOCP;
2              
3 2     2   201080 use strict;
  2         8  
  2         79  
4 2     2   9 use warnings;
  2         2  
  2         98  
5 2     2   25 use 5.010;
  2         6  
6              
7 2     2   6 use parent 'Hypersonic::Event::Role';
  2         2  
  2         36  
8              
9             our $VERSION = '0.15';
10              
11 0     0 1 0 sub name { 'iocp' }
12              
13             sub available {
14 3     3 1 27 return $^O eq 'MSWin32';
15             }
16              
17             sub includes {
18 0     0 1   return <<'C';
19             #include
20             #include
21             #include
22             #pragma comment(lib, "ws2_32.lib")
23             #pragma comment(lib, "mswsock.lib")
24             C
25             }
26              
27             sub defines {
28 0     0 1   return <<'C';
29             #define EV_BACKEND_IOCP 1
30             #ifndef MAX_EVENTS
31             #define MAX_EVENTS 1024
32             #endif
33              
34             /* Operation types for OVERLAPPED tracking */
35             #define OP_ACCEPT 1
36             #define OP_READ 2
37             #define OP_WRITE 3
38              
39             /* Per-I/O data structure */
40             typedef struct {
41             OVERLAPPED overlapped;
42             WSABUF wsa_buf;
43             char buffer[65536];
44             int op_type;
45             SOCKET socket;
46             } PER_IO_DATA;
47              
48             /* AcceptEx function pointer (loaded dynamically) */
49             static LPFN_ACCEPTEX lpfnAcceptEx = NULL;
50             C
51             }
52              
53 0     0 1   sub event_struct { 'OVERLAPPED_ENTRY' }
54              
55 0     0 1   sub extra_cflags { '' }
56 0     0 1   sub extra_ldflags { '-lws2_32 -lmswsock' }
57              
58             # IOCP is completion-based, fundamentally different from readiness-based APIs
59             sub gen_create {
60 0     0 1   my ($class, $builder, $listen_fd_var) = @_;
61              
62 0           $builder->comment('IOCP backend - Windows I/O Completion Ports')
63             ->comment('High-performance completion-based I/O')
64             ->blank
65             ->comment('Initialize Winsock')
66             ->line('WSADATA wsa_data;')
67             ->if('WSAStartup(MAKEWORD(2,2), &wsa_data) != 0')
68             ->line('croak("WSAStartup failed");')
69             ->endif
70             ->blank
71             ->comment('Create I/O Completion Port')
72             ->line('HANDLE iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);')
73             ->if('iocp == NULL')
74             ->line('WSACleanup();')
75             ->line('croak("CreateIoCompletionPort failed");')
76             ->endif
77             ->blank
78             ->comment('Associate listen socket with IOCP')
79             ->line("if (CreateIoCompletionPort((HANDLE)$listen_fd_var, iocp, (ULONG_PTR)$listen_fd_var, 0) == NULL) {")
80             ->line(' CloseHandle(iocp);')
81             ->line(' WSACleanup();')
82             ->line(' croak("Failed to associate listen socket with IOCP");')
83             ->line('}')
84             ->blank
85             ->comment('Load AcceptEx function')
86             ->line('GUID guid_acceptex = WSAID_ACCEPTEX;')
87             ->line('DWORD bytes;')
88             ->line("if (WSAIoctl($listen_fd_var, SIO_GET_EXTENSION_FUNCTION_POINTER,")
89             ->line(' &guid_acceptex, sizeof(guid_acceptex),')
90             ->line(' &lpfnAcceptEx, sizeof(lpfnAcceptEx),')
91             ->line(' &bytes, NULL, NULL) == SOCKET_ERROR) {')
92             ->line(' CloseHandle(iocp);')
93             ->line(' WSACleanup();')
94             ->line(' croak("Failed to load AcceptEx");')
95             ->line('}')
96             ->blank
97             ->comment('Post initial AcceptEx')
98             ->line('PER_IO_DATA* accept_data = (PER_IO_DATA*)calloc(1, sizeof(PER_IO_DATA));')
99             ->line('accept_data->op_type = OP_ACCEPT;')
100             ->line('accept_data->socket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);')
101             ->line("lpfnAcceptEx($listen_fd_var, accept_data->socket, accept_data->buffer,")
102             ->line(' 0, sizeof(struct sockaddr_in) + 16, sizeof(struct sockaddr_in) + 16,')
103             ->line(' &bytes, &accept_data->overlapped);')
104             ->blank
105             ->line('int ev_fd = (int)(intptr_t)iocp;') # Store IOCP handle as ev_fd
106             ->blank;
107             }
108              
109             # Associate socket with IOCP and post read operation
110             sub gen_add {
111 0     0 1   my ($class, $builder, $loop_var, $fd_var) = @_;
112              
113 0           $builder->line("HANDLE iocp_handle = (HANDLE)(intptr_t)$loop_var;")
114             ->line("CreateIoCompletionPort((HANDLE)$fd_var, iocp_handle, (ULONG_PTR)$fd_var, 0);")
115             ->blank
116             ->comment('Post initial read operation')
117             ->line('PER_IO_DATA* io_data = (PER_IO_DATA*)calloc(1, sizeof(PER_IO_DATA));')
118             ->line('io_data->op_type = OP_READ;')
119             ->line("io_data->socket = $fd_var;")
120             ->line('io_data->wsa_buf.buf = io_data->buffer;')
121             ->line('io_data->wsa_buf.len = sizeof(io_data->buffer);')
122             ->line('DWORD flags = 0;')
123             ->line("WSARecv($fd_var, &io_data->wsa_buf, 1, NULL, &flags, &io_data->overlapped, NULL);");
124             }
125              
126             # Cancel pending I/O and close socket
127             sub gen_del {
128 0     0 1   my ($class, $builder, $loop_var, $fd_var) = @_;
129              
130 0           $builder->line("CancelIo((HANDLE)$fd_var);")
131             ->line("closesocket($fd_var);");
132             }
133              
134             # Wait for completions using GetQueuedCompletionStatusEx
135             sub gen_wait {
136 0     0 1   my ($class, $builder, $loop_var, $events_var, $count_var, $timeout_var) = @_;
137              
138 0           $builder->line("HANDLE iocp_handle = (HANDLE)(intptr_t)$loop_var;")
139             ->line("static OVERLAPPED_ENTRY $events_var" . "[MAX_EVENTS];")
140             ->line("ULONG $count_var = 0;")
141             ->blank
142             ->line("BOOL ok = GetQueuedCompletionStatusEx(iocp_handle, $events_var, MAX_EVENTS, &$count_var, $timeout_var, FALSE);")
143             ->if('!ok')
144             ->if('GetLastError() == WAIT_TIMEOUT')
145             ->line('continue;')
146             ->endif
147             ->line('perror("GetQueuedCompletionStatusEx");')
148             ->line('break;')
149             ->endif;
150             }
151              
152             # Extract socket and operation type from completion
153             sub gen_get_fd {
154 0     0 1   my ($class, $builder, $events_var, $index_var, $fd_var) = @_;
155              
156 0           $builder->line("OVERLAPPED_ENTRY* entry = &${events_var}[$index_var];")
157             ->line('PER_IO_DATA* io_data = CONTAINING_RECORD(entry->lpOverlapped, PER_IO_DATA, overlapped);')
158             ->line("int $fd_var = (int)io_data->socket;")
159             ->line('int op_type = io_data->op_type;')
160             ->line('DWORD bytes_transferred = entry->dwNumberOfBytesTransferred;')
161             ->blank
162             ->if('op_type == OP_ACCEPT')
163             ->comment('Accept completed - io_data->socket is the new client')
164             ->line("$fd_var = listen_fd;") # Signal this was accept
165             ->line('int client_fd = (int)io_data->socket;')
166             ->elsif('op_type == OP_READ')
167             ->if('bytes_transferred == 0')
168             ->comment('Connection closed')
169             ->line("closesocket($fd_var);")
170             ->line('free(io_data);')
171             ->line('g_active_connections--;')
172             ->line('continue;')
173             ->endif
174             ->comment('Data received - already in io_data->buffer')
175             ->endif;
176             }
177              
178             # Cleanup IOCP resources
179             sub gen_cleanup {
180 0     0 1   my ($class, $builder) = @_;
181              
182 0           $builder->line('CloseHandle(iocp);')
183             ->line('WSACleanup();');
184             }
185              
186             # Future/Pool integration - IOCP uses different mechanism for pool notify
187             # On Windows, pool uses a pipe which can be added to IOCP via overlapped read
188             sub gen_add_pool_notify {
189 0     0 0   my ($class, $builder, $loop_var, $notify_fd_var) = @_;
190              
191 0           $builder->line("/* Add pool notify fd to IOCP */")
192             ->line("HANDLE iocp_handle = (HANDLE)(intptr_t)$loop_var;")
193             ->line("CreateIoCompletionPort((HANDLE)$notify_fd_var, iocp_handle, (ULONG_PTR)$notify_fd_var, 0);")
194             ->line('PER_IO_DATA* pool_io = (PER_IO_DATA*)calloc(1, sizeof(PER_IO_DATA));')
195             ->line('pool_io->op_type = 4;') # New type for pool notify
196             ->line("pool_io->socket = $notify_fd_var;")
197             ->line('pool_io->wsa_buf.buf = pool_io->buffer;')
198             ->line('pool_io->wsa_buf.len = 1;')
199             ->line('DWORD flags = 0;')
200             ->line("ReadFile((HANDLE)$notify_fd_var, pool_io->buffer, 1, NULL, &pool_io->overlapped);");
201             }
202              
203             1;
204              
205             __END__