--------------------- PatchSet 11309 Date: 2007/03/07 16:20:32 Author: hno Branch: HEAD Tag: (none) Log: Fix kqueue handling the almost theoretical case of overflowing the changes queue kevent() does not like being called with an changes list alone. If there is a single error in the changes list (i.e. an closed fd or similar) it stops immediately and it's impossible to know what got processed or not.. this patch reallocs the arrays as needed to fit the current workload, leaving only a single kevent call once per comm loop. Members: src/comm_kqueue.c:1.10->1.11 Index: squid/src/comm_kqueue.c =================================================================== RCS file: /cvsroot/squid/squid/src/comm_kqueue.c,v retrieving revision 1.10 retrieving revision 1.11 diff -u -r1.10 -r1.11 --- squid/src/comm_kqueue.c 23 Oct 2006 11:22:21 -0000 1.10 +++ squid/src/comm_kqueue.c 7 Mar 2007 16:20:32 -0000 1.11 @@ -1,6 +1,6 @@ /* - * $Id: comm_kqueue.c,v 1.10 2006/10/23 11:22:21 hno Exp $ + * $Id: comm_kqueue.c,v 1.11 2007/03/07 16:20:32 hno Exp $ * * DEBUG: section 5 Socket Functions * @@ -39,16 +39,17 @@ #include #endif -#define KE_LENGTH 128 +#define KE_QUEUE_STEP 128 #define STATE_READ 1 #define STATE_WRITE 2 static int kq; static struct timespec zero_timespec; -static struct kevent *kqlst; /* kevent buffer */ +static struct kevent *kqlst; /* change buffer */ +static struct kevent *ke; /* event buffer */ static int kqmax; /* max structs to buffer */ static int kqoff; /* offset into the buffer */ -static unsigned *kqueue_state; /* keep track of the kqueue state */ +static unsigned char *kqueue_state; /* keep track of the kqueue state */ static void do_select_init() @@ -58,8 +59,9 @@ fatalf("comm_select_init: kqueue(): %s\n", xstrerror()); fd_open(kq, FD_UNKNOWN, "kqueue ctl"); commSetCloseOnExec(kq); - kqmax = getdtablesize(); + kqmax = KE_QUEUE_STEP; kqlst = xmalloc(sizeof(*kqlst) * kqmax); + ke = xmalloc(sizeof(*ke) * kqmax); kqueue_state = xcalloc(Squid_MaxFD, sizeof(*kqueue_state)); zero_timespec.tv_sec = 0; zero_timespec.tv_nsec = 0; @@ -90,37 +92,38 @@ commSetEvents(int fd, int need_read, int need_write) { struct kevent *kep; - int st_read = (kqueue_state[fd] & STATE_READ) != 0; - int st_write = (kqueue_state[fd] & STATE_WRITE) != 0; + int st_new = (need_read ? STATE_READ : 0) | + (need_write ? STATE_WRITE : 0); + int st_change; assert(fd >= 0); - debug(5, 8) ("commSetEvents(fd=%d)\n", fd); + debug(5, 8) ("commSetEvents(fd=%d, read=%d, write=%d)\n", fd, need_read, need_write); + st_change = kqueue_state[fd] ^ st_new; + if (!st_change) + return; + + if (kqoff >= kqmax - 2) { + kqmax = kqmax + KE_QUEUE_STEP; + assert(kqmax < Squid_MaxFD * 4); + kqlst = xrealloc(kqlst, sizeof(*kqlst) * kqmax); + ke = xrealloc(ke, sizeof(*ke) * kqmax); + } kep = kqlst + kqoff; - if (need_read != st_read) { + if (st_change & STATE_READ) { EV_SET(kep, (uintptr_t) fd, EVFILT_READ, need_read ? EV_ADD : EV_DELETE, 0, 0, 0); kqoff++; kep++; } - if (need_write != st_write) { + if (st_change & STATE_WRITE) { EV_SET(kep, (uintptr_t) fd, EVFILT_WRITE, need_write ? EV_ADD : EV_DELETE, 0, 0, 0); kqoff++; kep++; } - kqueue_state[fd] = (need_read ? STATE_READ : 0) | - (need_write ? STATE_WRITE : 0); - - if (kqoff >= kqmax - 2) { - if (kevent(kq, kqlst, kqoff, NULL, 0, &zero_timespec) == -1) { - debug(5, 1) ("commSetEvents(): kevent() failed on fd=%d: %s\n", - fd, xstrerror()); - return; - } - kqoff = 0; - } + kqueue_state[fd] = st_new; } static int @@ -128,14 +131,13 @@ { int i; int num; - static struct kevent ke[KE_LENGTH]; struct timespec timeout; timeout.tv_sec = msec / 1000; timeout.tv_nsec = (msec % 1000) * 1000000; statCounter.syscalls.polls++; - num = kevent(kq, kqlst, kqoff, ke, KE_LENGTH, &timeout); + num = kevent(kq, kqlst, kqoff, ke, kqmax, &timeout); kqoff = 0; if (num < 0) { getCurrentTime(); @@ -171,5 +173,10 @@ } } + if (num >= kqmax) { + kqmax = kqmax + KE_QUEUE_STEP; + kqlst = xrealloc(kqlst, sizeof(*kqlst) * kqmax); + ke = xrealloc(ke, sizeof(*ke) * kqmax); + } return COMM_OK; }