------------------------------------------------------------ revno: 13442 revision-id: squid3@treenet.co.nz-20140605082820-9th5ecz7lb7rfim9 parent: squid3@treenet.co.nz-20140604153016-xsix6wy08ixmvjtj committer: Amos Jeffries branch nick: trunk timestamp: Thu 2014-06-05 01:28:20 -0700 message: Update the Comm:: API for read(2) ... using an algorithm suggested by Alex Rousskov. The code for Comm:: read operations is shuffled into comm/libcomm.la and the files comm/Read.{h,cc} in symmetry with the current Comm::Write API. The new API consists of: * Comm::Read() which accepts the Comm::Connection pointer for the socket to read on and an AsyncCall callback to be run when read is ready. The Job is responsible for separately initiating read(2) or alternative action when that callback is run. * Comm::ReadNow() which accepts an SBuf buffer and a CommIoCbParams initialized to contain the Comm::Connection pointer for the socket to read on. TheCommIoCbParams will be filled out with result flag, xerrno, and size. This synchronously performs read(2) operations to append bytes to the provided buffer. It returns a comm_err_t flag for use in determining how to handle the results and signalling one of OK, INPROGRESS, ERROR, EOF as having happened. comm_read() API is retained for backward compatibility during the transitional period. However it is now deprecated and scheduled for removal ASAP. The SBuf overloaded variant is now removed. * Comm::ReadCancel() - a renaming of the comm_read_cancel() AsyncCall API. Other cancel API(s) are now deprecated and will be removed ASAP. Code using comm_read_cancel() with AsyncCall may immediately switch to this new API with no logic changes necessary even if they are not using other new Comm API calls. * Comm::MonitorsRead() - a renaming of comm_monitors_read() AsyncCall API. comm_monitors_read() is now removed. Other changes: - the unused comm_has_pending_read_callback() API is erased. - the IoCallback::buf2 mechanism previously used for SBuf read I/O is erased. - ConnStateData is converted to this new API for filling its SBuf I/O buffer and for monitoring pinned connection closures. - fde::readPending() converted to new Comm::MonitorsRead() API. - Comm half-closed monitoring feature is also converted to this new API. NP: one bug in ConnStateData handling of intercepted HTTPS traffic is noted but not fixed in this patch. ------------------------------------------------------------ # Bazaar merge directive format 2 (Bazaar 0.90) # revision_id: squid3@treenet.co.nz-20140605082820-9th5ecz7lb7rfim9 # target_branch: http://bzr.squid-cache.org/bzr/squid3/trunk/ # testament_sha1: db3bb5caca0aafa01ff1b21c635ded7fbde949af # timestamp: 2014-06-05 08:53:47 +0000 # source_branch: http://bzr.squid-cache.org/bzr/squid3/trunk/ # base_revision_id: squid3@treenet.co.nz-20140604153016-\ # xsix6wy08ixmvjtj # # Begin patch === modified file 'src/CommCalls.h' --- src/CommCalls.h 2014-03-15 11:42:55 +0000 +++ src/CommCalls.h 2014-06-05 08:28:20 +0000 @@ -106,8 +106,6 @@ bool syncWithComm(); // see CommCommonCbParams::syncWithComm }; -class SBuf; - // read/write (I/O) parameters class CommIoCbParams: public CommCommonCbParams { @@ -120,7 +118,6 @@ public: char *buf; size_t size; - SBuf *buf2; // alternative buffer for use when buf is unset }; // close parameters === modified file 'src/client_side.cc' --- src/client_side.cc 2014-05-21 06:29:38 +0000 +++ src/client_side.cc 2014-06-05 08:28:20 +0000 @@ -94,6 +94,7 @@ #include "comm.h" #include "comm/Connection.h" #include "comm/Loops.h" +#include "comm/Read.h" #include "comm/TcpAcceptor.h" #include "comm/Write.h" #include "CommCalls.h" @@ -237,8 +238,8 @@ } /** - * This routine should be called to grow the inbuf and then - * call comm_read(). + * This routine should be called to grow the in.buf and then + * call Comm::Read(). */ void ConnStateData::readSomeData() @@ -253,7 +254,7 @@ typedef CommCbMemFunT Dialer; reader = JobCallback(33, 5, Dialer, this, ConnStateData::clientReadRequest); - comm_read(clientConnection, in.buf, reader); + Comm::Read(clientConnection, reader); } void @@ -2417,26 +2418,6 @@ } int -ConnStateData::connReadWasError(comm_err_t flag, int size, int xerrno) -{ - if (flag != COMM_OK) { - debugs(33, 2, "connReadWasError: FD " << clientConnection << ": got flag " << flag); - return 1; - } - - if (size < 0) { - if (!ignoreErrno(xerrno)) { - debugs(33, 2, "connReadWasError: FD " << clientConnection << ": " << xstrerr(xerrno)); - return 1; - } else if (in.buf.isEmpty()) { - debugs(33, 2, "connReadWasError: FD " << clientConnection << ": no data to process (" << xstrerr(xerrno) << ")"); - } - } - - return 0; -} - -int ConnStateData::connFinishedWithConn(int size) { if (size == 0) { @@ -2984,14 +2965,13 @@ void ConnStateData::clientReadRequest(const CommIoCbParams &io) { - debugs(33,5,HERE << io.conn << " size " << io.size); + debugs(33,5, io.conn); Must(reading()); reader = NULL; /* Bail out quickly on COMM_ERR_CLOSING - close handlers will tidy up */ - if (io.flag == COMM_ERR_CLOSING) { - debugs(33,5, HERE << io.conn << " closing Bailout."); + debugs(33,5, io.conn << " closing Bailout."); return; } @@ -2999,49 +2979,60 @@ assert(io.conn->fd == clientConnection->fd); /* - * Don't reset the timeout value here. The timeout value will be - * set to Config.Timeout.request by httpAccept() and - * clientWriteComplete(), and should apply to the request as a - * whole, not individual read() calls. Plus, it breaks our - * lame half-close detection + * Don't reset the timeout value here. The value should be + * counting Config.Timeout.request and applies to the request + * as a whole, not individual read() calls. + * Plus, it breaks our lame *HalfClosed() detection */ - if (connReadWasError(io.flag, io.size, io.xerrno)) { - notifyAllContexts(io.xerrno); + + CommIoCbParams rd(this); // will be expanded with ReadNow results + rd.conn = io.conn; + switch (Comm::ReadNow(rd, in.buf)) + { + case COMM_INPROGRESS: + if (in.buf.isEmpty()) + debugs(33, 2, io.conn << ": no data to process, " << xstrerr(rd.xerrno)); + readSomeData(); + return; + + case COMM_OK: + kb_incr(&(statCounter.client_http.kbytes_in), rd.size); + // may comm_close or setReplyToError + if (!handleReadData()) + return; + + /* Continue to process previously read data */ + break; + + case COMM_EOF: // close detected by 0-byte read + debugs(33, 5, io.conn << " closed?"); + + if (connFinishedWithConn(rd.size)) { + clientConnection->close(); + return; + } + + /* It might be half-closed, we can't tell */ + fd_table[io.conn->fd].flags.socket_eof = true; + commMarkHalfClosed(io.conn->fd); + fd_note(io.conn->fd, "half-closed"); + + /* There is one more close check at the end, to detect aborted + * (partial) requests. At this point we can't tell if the request + * is partial. + */ + + /* Continue to process previously read data */ + break; + + // case COMM_ERROR: + default: // no other flags should ever occur + debugs(33, 2, io.conn << ": got flag " << rd.flag << "; " << xstrerr(rd.xerrno)); + notifyAllContexts(rd.xerrno); io.conn->close(); return; } - if (io.flag == COMM_OK) { - if (io.size > 0) { - kb_incr(&(statCounter.client_http.kbytes_in), io.size); - - // may comm_close or setReplyToError - if (!handleReadData(io.buf2)) - return; - - } else if (io.size == 0) { - debugs(33, 5, HERE << io.conn << " closed?"); - - if (connFinishedWithConn(io.size)) { - clientConnection->close(); - return; - } - - /* It might be half-closed, we can't tell */ - fd_table[io.conn->fd].flags.socket_eof = true; - - commMarkHalfClosed(io.conn->fd); - - fd_note(io.conn->fd, "half-closed"); - - /* There is one more close check at the end, to detect aborted - * (partial) requests. At this point we can't tell if the request - * is partial. - */ - /* Continue to process previously read data */ - } - } - /* Process next request */ if (getConcurrentRequestCount() == 0) fd_note(io.fd, "Reading next request"); @@ -3077,10 +3068,8 @@ * \retval true we did not call comm_close or setReplyToError */ bool -ConnStateData::handleReadData(SBuf *buf) +ConnStateData::handleReadData() { - assert(buf == &in.buf); // XXX: make this abort the transaction if this fails - // if we are reading a body, stuff data into the body pipe if (bodyPipe != NULL) return handleRequestBodyData(); @@ -3631,8 +3620,9 @@ // fake a CONNECT request to force connState to tunnel static char ip[MAX_IPSTRLEN]; connState->clientConnection->local.toUrl(ip, sizeof(ip)); + // XXX need to *pre-pend* this fake request to the TLS bits already in the buffer connState->in.buf.append("CONNECT ").append(ip).append(" HTTP/1.1\r\nHost: ").append(ip).append("\r\n\r\n"); - bool ret = connState->handleReadData(&connState->in.buf); + bool ret = connState->handleReadData(); if (ret) ret = connState->clientParseRequests(); @@ -4298,7 +4288,7 @@ ConnStateData::stopReading() { if (reading()) { - comm_read_cancel(clientConnection->fd, reader); + Comm::ReadCancel(clientConnection->fd, reader); reader = NULL; } } @@ -4498,15 +4488,14 @@ typedef CommCbMemFunT Dialer; pinning.readHandler = JobCallback(33, 3, Dialer, this, ConnStateData::clientPinnedConnectionRead); - static char unusedBuf[8]; - comm_read(pinning.serverConnection, unusedBuf, sizeof(unusedBuf), pinning.readHandler); + Comm::Read(pinning.serverConnection, pinning.readHandler); } void ConnStateData::stopPinnedConnectionMonitoring() { if (pinning.readHandler != NULL) { - comm_read_cancel(pinning.serverConnection->fd, pinning.readHandler); + Comm::ReadCancel(pinning.serverConnection->fd, pinning.readHandler); pinning.readHandler = NULL; } } === modified file 'src/client_side.h' --- src/client_side.h 2014-03-30 12:00:34 +0000 +++ src/client_side.h 2014-06-05 08:28:20 +0000 @@ -290,7 +290,7 @@ virtual void noteMoreBodySpaceAvailable(BodyPipe::Pointer); virtual void noteBodyConsumerAborted(BodyPipe::Pointer); - bool handleReadData(SBuf *buf); + bool handleReadData(); bool handleRequestBodyData(); /** @@ -385,7 +385,6 @@ void clientPinnedConnectionRead(const CommIoCbParams &io); private: - int connReadWasError(comm_err_t flag, int size, int xerrno); int connFinishedWithConn(int size); void clientAfterReadingRequests(); bool concurrentRequestQueueFilled() const; === modified file 'src/comm.cc' --- src/comm.cc 2014-04-30 10:50:09 +0000 +++ src/comm.cc 2014-06-05 08:28:20 +0000 @@ -39,6 +39,7 @@ #include "comm/Connection.h" #include "comm/IoCallback.h" #include "comm/Loops.h" +#include "comm/Read.h" #include "comm/TcpAcceptor.h" #include "comm/Write.h" #include "CommRead.h" @@ -80,7 +81,6 @@ * New C-like simple comm code. This stuff is a mess and doesn't really buy us anything. */ -static void commStopHalfClosedMonitor(int fd); static IOCB commHalfClosedReader; static void comm_init_opened(const Comm::ConnectionPointer &conn, tos_t tos, nfmark_t nfmark, const char *note, struct addrinfo *AI); static int comm_apply_flags(int new_socket, Ip::Address &addr, int flags, struct addrinfo *AI); @@ -115,116 +115,6 @@ } /** - * Attempt a read - * - * If the read attempt succeeds or fails, call the callback. - * Else, wait for another IO notification. - */ -void -commHandleRead(int fd, void *data) -{ - Comm::IoCallback *ccb = (Comm::IoCallback *) data; - - assert(data == COMMIO_FD_READCB(fd)); - assert(ccb->active()); - /* Attempt a read */ - ++ statCounter.syscalls.sock.reads; - errno = 0; - int retval; - if (ccb->buf) { - retval = FD_READ_METHOD(fd, ccb->buf, ccb->size); - debugs(5, 3, "char FD " << fd << ", size " << ccb->size << ", retval " << retval << ", errno " << errno); - } else { - assert(ccb->buf2 != NULL); - SBuf::size_type sz = ccb->buf2->spaceSize(); - char *buf = ccb->buf2->rawSpace(sz); - retval = FD_READ_METHOD(fd, buf, sz-1); // blocking synchronous read(2) - if (retval > 0) { - ccb->buf2->append(buf, retval); - } - debugs(5, 3, "SBuf FD " << fd << ", size " << sz << ", retval " << retval << ", errno " << errno); - } - - if (retval < 0 && !ignoreErrno(errno)) { - debugs(5, 3, "comm_read_try: scheduling COMM_ERROR"); - ccb->offset = 0; - ccb->finish(COMM_ERROR, errno); - return; - }; - - /* See if we read anything */ - /* Note - read 0 == socket EOF, which is a valid read */ - if (retval >= 0) { - fd_bytes(fd, retval, FD_READ); - ccb->offset = retval; - ccb->finish(COMM_OK, errno); - return; - } - - /* Nope, register for some more IO */ - Comm::SetSelect(fd, COMM_SELECT_READ, commHandleRead, data, 0); -} - -/** - * Queue a read. handler/handler_data are called when the read - * completes, on error, or on file descriptor close. - */ -void -comm_read(const Comm::ConnectionPointer &conn, char *buf, int size, AsyncCall::Pointer &callback) -{ - debugs(5, 5, "comm_read, queueing read for " << conn << "; asynCall " << callback); - - /* Make sure we are open and not closing */ - assert(Comm::IsConnOpen(conn)); - assert(!fd_table[conn->fd].closing()); - Comm::IoCallback *ccb = COMMIO_FD_READCB(conn->fd); - - // Make sure we are either not reading or just passively monitoring. - // Active/passive conflicts are OK and simply cancel passive monitoring. - if (ccb->active()) { - // if the assertion below fails, we have an active comm_read conflict - assert(fd_table[conn->fd].halfClosedReader != NULL); - commStopHalfClosedMonitor(conn->fd); - assert(!ccb->active()); - } - ccb->conn = conn; - - /* Queue the read */ - ccb->setCallback(Comm::IOCB_READ, callback, (char *)buf, NULL, size); - Comm::SetSelect(conn->fd, COMM_SELECT_READ, commHandleRead, ccb, 0); -} - -/** - * Queue a read. handler/handler_data are called when the read - * completes, on error, or on file descriptor close. - */ -void -comm_read(const Comm::ConnectionPointer &conn, SBuf &buf, AsyncCall::Pointer &callback) -{ - debugs(5, 5, "comm_read, queueing read for " << conn << "; asynCall " << callback); - - /* Make sure we are open and not closing */ - assert(Comm::IsConnOpen(conn)); - assert(!fd_table[conn->fd].closing()); - Comm::IoCallback *ccb = COMMIO_FD_READCB(conn->fd); - - // Make sure we are either not reading or just passively monitoring. - // Active/passive conflicts are OK and simply cancel passive monitoring. - if (ccb->active()) { - // if the assertion below fails, we have an active comm_read conflict - assert(fd_table[conn->fd].halfClosedReader != NULL); - commStopHalfClosedMonitor(conn->fd); - assert(!ccb->active()); - } - ccb->conn = conn; - ccb->buf2 = &buf; - - /* Queue the read */ - ccb->setCallback(Comm::IOCB_READ, callback, NULL, NULL, buf.spaceSize()); - Comm::SetSelect(conn->fd, COMM_SELECT_READ, commHandleRead, ccb, 0); -} - -/** * Empty the read buffers * * This is a magical routine that empties the read buffers. @@ -246,115 +136,6 @@ } /** - * Return whether the FD has a pending completed callback. - * NP: does not work. - */ -int -comm_has_pending_read_callback(int fd) -{ - assert(isOpen(fd)); - // XXX: We do not know whether there is a read callback scheduled. - // This is used for pconn management that should probably be more - // tightly integrated into comm to minimize the chance that a - // closing pconn socket will be used for a new transaction. - return false; -} - -// Does comm check this fd for read readiness? -// Note that when comm is not monitoring, there can be a pending callback -// call, which may resume comm monitoring once fired. -bool -comm_monitors_read(int fd) -{ - assert(isOpen(fd) && COMMIO_FD_READCB(fd)); - // Being active is usually the same as monitoring because we always - // start monitoring the FD when we configure Comm::IoCallback for I/O - // and we usually configure Comm::IoCallback for I/O when we starting - // monitoring a FD for reading. - return COMMIO_FD_READCB(fd)->active(); -} - -/** - * Cancel a pending read. Assert that we have the right parameters, - * and that there are no pending read events! - * - * XXX: We do not assert that there are no pending read events and - * with async calls it becomes even more difficult. - * The whole interface should be reworked to do callback->cancel() - * instead of searching for places where the callback may be stored and - * updating the state of those places. - * - * AHC Don't call the comm handlers? - */ -void -comm_read_cancel(int fd, IOCB *callback, void *data) -{ - if (!isOpen(fd)) { - debugs(5, 4, "comm_read_cancel fails: FD " << fd << " closed"); - return; - } - - Comm::IoCallback *cb = COMMIO_FD_READCB(fd); - // TODO: is "active" == "monitors FD"? - if (!cb->active()) { - debugs(5, 4, "comm_read_cancel fails: FD " << fd << " inactive"); - return; - } - - typedef CommCbFunPtrCallT Call; - Call *call = dynamic_cast(cb->callback.getRaw()); - if (!call) { - debugs(5, 4, "comm_read_cancel fails: FD " << fd << " lacks callback"); - return; - } - - call->cancel("old comm_read_cancel"); - - typedef CommIoCbParams Params; - const Params ¶ms = GetCommParams(cb->callback); - - /* Ok, we can be reasonably sure we won't lose any data here! */ - assert(call->dialer.handler == callback); - assert(params.data == data); - - /* Delete the callback */ - cb->cancel("old comm_read_cancel"); - - /* And the IO event */ - Comm::SetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0); -} - -void -comm_read_cancel(int fd, AsyncCall::Pointer &callback) -{ - callback->cancel("comm_read_cancel"); - - if (!isOpen(fd)) { - debugs(5, 4, "comm_read_cancel fails: FD " << fd << " closed"); - return; - } - - Comm::IoCallback *cb = COMMIO_FD_READCB(fd); - - if (!cb->active()) { - debugs(5, 4, "comm_read_cancel fails: FD " << fd << " inactive"); - return; - } - - AsyncCall::Pointer call = cb->callback; - assert(call != NULL); // XXX: should never fail (active() checks for callback==NULL) - - /* Ok, we can be reasonably sure we won't lose any data here! */ - assert(call == callback); - - /* Delete the callback */ - cb->cancel("comm_read_cancel"); - - /* And the IO event */ - Comm::SetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0); -} - -/** * synchronous wrapper around udp socket functions */ int @@ -1886,7 +1667,7 @@ if (!fd_table[c->fd].halfClosedReader) { // not reading already AsyncCall::Pointer call = commCbCall(5,4, "commHalfClosedReader", CommIoCbPtrFun(&commHalfClosedReader, NULL)); - comm_read(c, NULL, 0, call); + Comm::Read(c, call); fd_table[c->fd].halfClosedReader = call; } else c->fd = -1; // XXX: temporary. prevent c replacement erase closing listed FD @@ -1905,7 +1686,7 @@ } /// stop waiting for possibly half-closed connection to close -static void +void commStopHalfClosedMonitor(int const fd) { debugs(5, 5, HERE << "removing FD " << fd << " from " << *TheHalfClosed); === modified file 'src/comm.h' --- src/comm.h 2014-03-15 02:30:08 +0000 +++ src/comm.h 2014-06-05 08:28:20 +0000 @@ -77,12 +77,6 @@ void comm_remove_close_handler(int fd, CLCB *, void *); void comm_remove_close_handler(int fd, AsyncCall::Pointer &); -int comm_has_pending_read_callback(int fd); -bool comm_monitors_read(int fd); -void comm_read(const Comm::ConnectionPointer &conn, char *buf, int len, AsyncCall::Pointer &callback); -void comm_read(const Comm::ConnectionPointer &conn, SBuf &buf, AsyncCall::Pointer &callback); -void comm_read_cancel(int fd, IOCB *callback, void *data); -void comm_read_cancel(int fd, AsyncCall::Pointer &callback); int comm_udp_recvfrom(int fd, void *buf, size_t len, int flags, Ip::Address &from); int comm_udp_recv(int fd, void *buf, size_t len, int flags); ssize_t comm_udp_send(int s, const void *buf, size_t len, int flags); === modified file 'src/comm/IoCallback.cc' --- src/comm/IoCallback.cc 2014-03-15 02:30:08 +0000 +++ src/comm/IoCallback.cc 2014-06-05 08:28:20 +0000 @@ -89,7 +89,6 @@ Comm::IoCallback::reset() { conn = NULL; - buf2 = NULL; // we do not own this buffer. if (freefunc) { freefunc(buf); buf = NULL; @@ -121,7 +120,6 @@ Params ¶ms = GetCommParams(callback); if (conn != NULL) params.fd = conn->fd; // for legacy write handlers... params.conn = conn; - params.buf2 = buf2; params.buf = buf; params.size = offset; params.flag = code; === modified file 'src/comm/IoCallback.h' --- src/comm/IoCallback.h 2014-03-15 11:42:55 +0000 +++ src/comm/IoCallback.h 2014-06-05 08:28:20 +0000 @@ -25,14 +25,6 @@ iocb_type type; Comm::ConnectionPointer conn; AsyncCall::Pointer callback; - - /// Buffer to store read(2) into when set. - // This is a pointer to the Jobs buffer rather than an SBuf using - // the same store since we cannot know when or how the Job will - // alter its SBuf while we are reading. - SBuf *buf2; - - // Legacy c-string buffers used when buf2 is unset. char *buf; FREE *freefunc; int size; === modified file 'src/comm/Makefile.am' --- src/comm/Makefile.am 2012-03-29 09:22:41 +0000 +++ src/comm/Makefile.am 2014-06-05 08:28:20 +0000 @@ -21,6 +21,8 @@ ModPoll.cc \ ModSelect.cc \ ModSelectWin32.cc \ + Read.cc \ + Read.h \ TcpAcceptor.cc \ TcpAcceptor.h \ UdpOpenDialer.h \ === added file 'src/comm/Read.cc' --- src/comm/Read.cc 1970-01-01 00:00:00 +0000 +++ src/comm/Read.cc 2014-06-05 08:28:20 +0000 @@ -0,0 +1,234 @@ +/* + * DEBUG: section 05 Socket Functions + */ +#include "squid.h" +#include "comm.h" +#include "comm_internal.h" +#include "CommCalls.h" +#include "comm/IoCallback.h" +#include "comm/Loops.h" +#include "comm/Read.h" +#include "Debug.h" +#include "fd.h" +#include "fde.h" +#include "SBuf.h" +#include "StatCounters.h" +//#include "tools.h" + +// Does comm check this fd for read readiness? +// Note that when comm is not monitoring, there can be a pending callback +// call, which may resume comm monitoring once fired. +bool +Comm::MonitorsRead(int fd) +{ + assert(isOpen(fd) && COMMIO_FD_READCB(fd)); + // Being active is usually the same as monitoring because we always + // start monitoring the FD when we configure Comm::IoCallback for I/O + // and we usually configure Comm::IoCallback for I/O when we starting + // monitoring a FD for reading. + return COMMIO_FD_READCB(fd)->active(); +} + +void +Comm::Read(const Comm::ConnectionPointer &conn, AsyncCall::Pointer &callback) +{ + // TODO: move comm_read_base() internals into here + // when comm_read() char* API is no longer needed + comm_read_base(conn, NULL, 0, callback); +} + +/** + * Queue a read. + * If a buffer is given the callback is scheduled when the read + * completes, on error, or on file descriptor close. + * + * If no buffer (NULL) is given the callback is scheduled when + * the socket FD is ready for a read(2)/recv(2). + */ +void +comm_read_base(const Comm::ConnectionPointer &conn, char *buf, int size, AsyncCall::Pointer &callback) +{ + debugs(5, 5, "comm_read, queueing read for " << conn << "; asynCall " << callback); + + /* Make sure we are open and not closing */ + assert(Comm::IsConnOpen(conn)); + assert(!fd_table[conn->fd].closing()); + Comm::IoCallback *ccb = COMMIO_FD_READCB(conn->fd); + + // Make sure we are either not reading or just passively monitoring. + // Active/passive conflicts are OK and simply cancel passive monitoring. + if (ccb->active()) { + // if the assertion below fails, we have an active comm_read conflict + assert(fd_table[conn->fd].halfClosedReader != NULL); + commStopHalfClosedMonitor(conn->fd); + assert(!ccb->active()); + } + ccb->conn = conn; + + /* Queue the read */ + ccb->setCallback(Comm::IOCB_READ, callback, (char *)buf, NULL, size); + Comm::SetSelect(conn->fd, COMM_SELECT_READ, Comm::HandleRead, ccb, 0); +} + +comm_err_t +Comm::ReadNow(CommIoCbParams ¶ms, SBuf &buf) +{ + /* Attempt a read */ + ++ statCounter.syscalls.sock.reads; + const SBuf::size_type sz = buf.spaceSize(); + char *theBuf = buf.rawSpace(sz); + errno = 0; + const int retval = FD_READ_METHOD(params.conn->fd, theBuf, sz); + params.xerrno = errno; + + debugs(5, 3, params.conn << ", size " << sz << ", retval " << retval << ", errno " << params.xerrno); + + if (retval > 0) { // data read most common case + buf.append(theBuf, retval); + fd_bytes(params.conn->fd, retval, FD_READ); + params.flag = COMM_OK; + params.size = retval; + + } else if (retval == 0) { // remote closure (somewhat less) common + // Note - read 0 == socket EOF, which is a valid read. + params.flag = COMM_EOF; + + } else if (retval < 0) { // connection errors are worst-case + debugs(5, 3, params.conn << " COMM_ERROR: " << xstrerr(params.xerrno)); + if (ignoreErrno(params.xerrno)) + params.flag = COMM_INPROGRESS; + else + params.flag = COMM_ERROR; + } + + return params.flag; +} + +/** + * Handle an FD which is ready for read(2). + * + * If there is no provided buffer to fill call the callback. + * + * Otherwise attempt a read into the provided buffer. + * If the read attempt succeeds or fails, call the callback. + * Else, wait for another IO notification. + */ +void +Comm::HandleRead(int fd, void *data) +{ + Comm::IoCallback *ccb = (Comm::IoCallback *) data; + + assert(data == COMMIO_FD_READCB(fd)); + assert(ccb->active()); + + // without a buffer, just call back + if (!ccb->buf) { + ccb->finish(COMM_OK, 0); + return; + } + + /* For legacy callers : Attempt a read */ + // Keep in sync with Comm::ReadNow()! + ++ statCounter.syscalls.sock.reads; + errno = 0; + int retval = FD_READ_METHOD(fd, ccb->buf, ccb->size); + debugs(5, 3, "FD " << fd << ", size " << ccb->size << ", retval " << retval << ", errno " << errno); + + /* See if we read anything */ + /* Note - read 0 == socket EOF, which is a valid read */ + if (retval >= 0) { + fd_bytes(fd, retval, FD_READ); + ccb->offset = retval; + ccb->finish(COMM_OK, errno); + return; + + } else if (retval < 0 && !ignoreErrno(errno)) { + debugs(5, 3, "comm_read_try: scheduling COMM_ERROR"); + ccb->offset = 0; + ccb->finish(COMM_ERROR, errno); + return; + }; + + + /* Nope, register for some more IO */ + Comm::SetSelect(fd, COMM_SELECT_READ, Comm::HandleRead, data, 0); +} + +/** + * Cancel a pending read. Assert that we have the right parameters, + * and that there are no pending read events! + * + * XXX: We do not assert that there are no pending read events and + * with async calls it becomes even more difficult. + * The whole interface should be reworked to do callback->cancel() + * instead of searching for places where the callback may be stored and + * updating the state of those places. + * + * AHC Don't call the comm handlers? + */ +void +comm_read_cancel(int fd, IOCB *callback, void *data) +{ + if (!isOpen(fd)) { + debugs(5, 4, "fails: FD " << fd << " closed"); + return; + } + + Comm::IoCallback *cb = COMMIO_FD_READCB(fd); + // TODO: is "active" == "monitors FD"? + if (!cb->active()) { + debugs(5, 4, "fails: FD " << fd << " inactive"); + return; + } + + typedef CommCbFunPtrCallT Call; + Call *call = dynamic_cast(cb->callback.getRaw()); + if (!call) { + debugs(5, 4, "fails: FD " << fd << " lacks callback"); + return; + } + + call->cancel("old comm_read_cancel"); + + typedef CommIoCbParams Params; + const Params ¶ms = GetCommParams(cb->callback); + + /* Ok, we can be reasonably sure we won't lose any data here! */ + assert(call->dialer.handler == callback); + assert(params.data == data); + + /* Delete the callback */ + cb->cancel("old comm_read_cancel"); + + /* And the IO event */ + Comm::SetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0); +} + +void +Comm::ReadCancel(int fd, AsyncCall::Pointer &callback) +{ + callback->cancel("comm_read_cancel"); + + if (!isOpen(fd)) { + debugs(5, 4, "fails: FD " << fd << " closed"); + return; + } + + Comm::IoCallback *cb = COMMIO_FD_READCB(fd); + + if (!cb->active()) { + debugs(5, 4, "fails: FD " << fd << " inactive"); + return; + } + + AsyncCall::Pointer call = cb->callback; + + /* Ok, we can be reasonably sure we won't lose any data here! */ + assert(call == callback); + + /* Delete the callback */ + cb->cancel("comm_read_cancel"); + + /* And the IO event */ + Comm::SetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0); +} === added file 'src/comm/Read.h' --- src/comm/Read.h 1970-01-01 00:00:00 +0000 +++ src/comm/Read.h 2014-06-05 08:28:20 +0000 @@ -0,0 +1,55 @@ +#ifndef _SQUID_COMM_READ_H +#define _SQUID_COMM_READ_H + +#include "base/AsyncCall.h" +#include "CommCalls.h" +#include "comm/forward.h" + +class SBuf; + +namespace Comm +{ + +/** + * Start monitoring for read. + * + * callback is scheduled when the read is possible, + * or on file descriptor close. + */ +void Read(const Comm::ConnectionPointer &conn, AsyncCall::Pointer &callback); + +/// whether the FD socket is being monitored for read +bool MonitorsRead(int fd); + +/** + * Perform a read(2) on a connection immediately. + * + * The returned flag is also placed in params.flag. + * + * \retval COMM_OK data has been read and placed in buf, amount in params.size + * \retval COMM_ERROR an error occured, the code is placed in params.xerrno + * \retval COMM_INPROGRESS unable to read at this time, or a minor error occured + * \retval COMM_ERR_CLOSING 0-byte read has occured. + * Usually indicates the remote end has disconnected. + */ +comm_err_t ReadNow(CommIoCbParams ¶ms, SBuf &buf); + +/// Cancel the read pending on FD. No action if none pending. +void ReadCancel(int fd, AsyncCall::Pointer &callback); + +/// callback handler to process an FD which is available for reading +extern PF HandleRead; + +} // namespace Comm + +// Legacy API to be removed +void comm_read_base(const Comm::ConnectionPointer &conn, char *buf, int len, AsyncCall::Pointer &callback); +inline void comm_read(const Comm::ConnectionPointer &conn, char *buf, int len, AsyncCall::Pointer &callback) +{ + assert(buf != NULL); + comm_read_base(conn, buf, len, callback); +} +void comm_read_cancel(int fd, IOCB *callback, void *data); +inline void comm_read_cancel(int fd, AsyncCall::Pointer &callback) {Comm::ReadCancel(fd,callback);} + +#endif /* _SQUID_COMM_READ_H */ === modified file 'src/comm/comm_err_t.h' --- src/comm/comm_err_t.h 2011-07-12 05:52:07 +0000 +++ src/comm/comm_err_t.h 2014-06-05 08:28:20 +0000 @@ -13,6 +13,7 @@ COMM_ERR_DNS = -9, COMM_ERR_CLOSING = -10, COMM_ERR_PROTOCOL = -11, /* IPv4 or IPv6 cannot be used on the fd socket */ + COMM_EOF = -12, /**< read(2) returned success, but with 0 bytes */ COMM_ERR__END__ = -999999 /* Dummy entry to make syntax valid (comma on line above), do not use. New entries added above */ } comm_err_t; === modified file 'src/comm/comm_internal.h' --- src/comm/comm_internal.h 2012-09-21 14:57:30 +0000 +++ src/comm/comm_internal.h 2014-06-05 08:28:20 +0000 @@ -12,5 +12,6 @@ extern fd_debug_t *fdd_table; bool isOpen(const int fd); +void commStopHalfClosedMonitor(int fd); #endif === modified file 'src/dns_internal.cc' --- src/dns_internal.cc 2014-06-02 07:19:35 +0000 +++ src/dns_internal.cc 2014-06-05 08:28:20 +0000 @@ -36,6 +36,7 @@ #include "comm/Connection.h" #include "comm/ConnOpener.h" #include "comm/Loops.h" +#include "comm/Read.h" #include "comm/Write.h" #include "dlink.h" #include "event.h" === modified file 'src/fde.cc' --- src/fde.cc 2014-04-30 10:50:09 +0000 +++ src/fde.cc 2014-06-05 08:28:20 +0000 @@ -32,7 +32,7 @@ */ #include "squid.h" -#include "comm.h" +#include "comm/Read.h" #include "fde.h" #include "globals.h" #include "SquidTime.h" @@ -44,7 +44,7 @@ fde::readPending(int fdNumber) { if (type == FD_SOCKET) - return comm_monitors_read(fdNumber); + return Comm::MonitorsRead(fdNumber); return read_handler ? true : false ; } === modified file 'src/ftp.cc' --- src/ftp.cc 2014-06-02 07:19:35 +0000 +++ src/ftp.cc 2014-06-05 08:28:20 +0000 @@ -34,6 +34,7 @@ #include "acl/FilledChecklist.h" #include "comm.h" #include "comm/ConnOpener.h" +#include "comm/Read.h" #include "comm/TcpAcceptor.h" #include "comm/Write.h" #include "CommCalls.h" === modified file 'src/gopher.cc' --- src/gopher.cc 2014-03-31 06:57:27 +0000 +++ src/gopher.cc 2014-06-05 08:28:20 +0000 @@ -31,6 +31,7 @@ #include "squid.h" #include "comm.h" +#include "comm/Read.h" #include "comm/Write.h" #include "errorpage.h" #include "fd.h" === modified file 'src/helper.cc' --- src/helper.cc 2014-03-04 10:33:08 +0000 +++ src/helper.cc 2014-06-05 08:28:20 +0000 @@ -34,6 +34,7 @@ #include "base/AsyncCbdataCalls.h" #include "comm.h" #include "comm/Connection.h" +#include "comm/Read.h" #include "comm/Write.h" #include "fd.h" #include "fde.h" === modified file 'src/ident/Ident.cc' --- src/ident/Ident.cc 2013-10-25 00:13:46 +0000 +++ src/ident/Ident.cc 2014-06-05 08:28:20 +0000 @@ -35,6 +35,7 @@ #include "comm.h" #include "comm/Connection.h" #include "comm/ConnOpener.h" +#include "comm/Read.h" #include "comm/Write.h" #include "CommCalls.h" #include "globals.h" === modified file 'src/ipc/Port.cc' --- src/ipc/Port.cc 2014-01-27 05:27:41 +0000 +++ src/ipc/Port.cc 2014-06-05 08:28:20 +0000 @@ -6,6 +6,7 @@ #include "squid.h" #include "comm.h" #include "comm/Connection.h" +#include "comm/Read.h" #include "CommCalls.h" #include "globals.h" #include "ipc/Port.h" === modified file 'src/pconn.cc' --- src/pconn.cc 2014-04-30 10:50:09 +0000 +++ src/pconn.cc 2014-06-05 08:28:20 +0000 @@ -34,6 +34,7 @@ #include "CachePeer.h" #include "comm.h" #include "comm/Connection.h" +#include "comm/Read.h" #include "fd.h" #include "fde.h" #include "globals.h" === modified file 'src/store.cc' --- src/store.cc 2014-02-21 10:46:19 +0000 +++ src/store.cc 2014-06-05 08:28:20 +0000 @@ -35,6 +35,7 @@ #include "CacheDigest.h" #include "CacheManager.h" #include "comm/Connection.h" +#include "comm/Read.h" #include "ETag.h" #include "event.h" #include "fde.h" === modified file 'src/tests/stub_client_side.cc' --- src/tests/stub_client_side.cc 2014-03-30 12:00:34 +0000 +++ src/tests/stub_client_side.cc 2014-06-05 08:28:20 +0000 @@ -51,7 +51,7 @@ void ConnStateData::expectNoForwarding() STUB void ConnStateData::noteMoreBodySpaceAvailable(BodyPipe::Pointer) STUB void ConnStateData::noteBodyConsumerAborted(BodyPipe::Pointer) STUB -bool ConnStateData::handleReadData(SBuf *buf) STUB_RETVAL(false) +bool ConnStateData::handleReadData() STUB_RETVAL(false) bool ConnStateData::handleRequestBodyData() STUB_RETVAL(false) void ConnStateData::pinConnection(const Comm::ConnectionPointer &pinServerConn, HttpRequest *request, CachePeer *peer, bool auth) STUB void ConnStateData::unpinConnection() STUB === modified file 'src/tests/stub_libcomm.cc' --- src/tests/stub_libcomm.cc 2013-03-26 10:38:20 +0000 +++ src/tests/stub_libcomm.cc 2014-06-05 08:28:20 +0000 @@ -48,6 +48,16 @@ comm_err_t Comm::DoSelect(int) STUB_RETVAL(COMM_ERROR) void Comm::QuickPollRequired(void) STUB +#include "comm/Read.h" +void Comm::Read(const Comm::ConnectionPointer &conn, AsyncCall::Pointer &callback) STUB +bool Comm::MonitorsRead(int fd) STUB_RETVAL(false) +comm_err_t Comm::ReadNow(CommIoCbParams ¶ms, SBuf &buf) STUB_RETVAL(COMM_ERROR) +void Comm::ReadCancel(int fd, AsyncCall::Pointer &callback) STUB +//void Comm::HandleRead(int, void*) STUB + +void comm_read_base(const Comm::ConnectionPointer &conn, char *buf, int len, AsyncCall::Pointer &callback) STUB +void comm_read_cancel(int fd, IOCB *callback, void *data) STUB + #include "comm/TcpAcceptor.h" //Comm::TcpAcceptor(const Comm::ConnectionPointer &conn, const char *note, const Subscription::Pointer &aSub) STUB void Comm::TcpAcceptor::subscribe(const Subscription::Pointer &aSub) STUB === modified file 'src/tunnel.cc' --- src/tunnel.cc 2014-06-03 08:52:29 +0000 +++ src/tunnel.cc 2014-06-05 08:28:20 +0000 @@ -40,6 +40,7 @@ #include "comm.h" #include "comm/Connection.h" #include "comm/ConnOpener.h" +#include "comm/Read.h" #include "comm/Write.h" #include "errorpage.h" #include "fde.h" === modified file 'src/whois.cc' --- src/whois.cc 2014-06-02 07:19:35 +0000 +++ src/whois.cc 2014-06-05 08:28:20 +0000 @@ -33,6 +33,7 @@ #include "squid.h" #include "comm.h" +#include "comm/Read.h" #include "comm/Write.h" #include "errorpage.h" #include "FwdState.h"