------------------------------------------------------------ revno: 13324 [merge] revision-id: squid3@treenet.co.nz-20140324045732-5u3nk1n4k8jtcrqo parent: chtsanti@users.sourceforge.net-20140323102852-d1h3u2nsrn258uig parent: squid3@treenet.co.nz-20140323025612-fzuiffenkl5rsjyb committer: Amos Jeffries branch nick: trunk timestamp: Sun 2014-03-23 21:57:32 -0700 message: Parser-NG: Convert the ConnStateData input buffer to SBuf Prepare the way to efficiently parse client requests using SBuf based parser-ng. IoCallback stores a raw-pointer to the ConnStateData::In::buf member object rather than an SBuf reference to the backing MemBlob or char* store so that only the short (blocking) FD_READ_METHOD() call needs to provide any synchronous guarantees. We also particularly need a direct (raw) pointer to the ConnStateData member to prevent the possible read/consume collisions causing problems with the ConnStateData callback and avoid having to merge two separate SBuf. ------------------------------------------------------------ Use --include-merges or -n0 to see merged revisions. ------------------------------------------------------------ # Bazaar merge directive format 2 (Bazaar 0.90) # revision_id: squid3@treenet.co.nz-20140324045732-5u3nk1n4k8jtcrqo # target_branch: http://bzr.squid-cache.org/bzr/squid3/trunk/ # testament_sha1: 58c98f4072d0a7a7f57c3cd6f28adbd3222f253a # timestamp: 2014-03-24 05:11:56 +0000 # source_branch: http://bzr.squid-cache.org/bzr/squid3/trunk/ # base_revision_id: chtsanti@users.sourceforge.net-20140323102852-\ # d1h3u2nsrn258uig # # Begin patch === modified file 'src/CommCalls.h' --- src/CommCalls.h 2014-01-28 19:28:23 +0000 +++ src/CommCalls.h 2014-03-15 11:42:55 +0000 @@ -106,6 +106,8 @@ bool syncWithComm(); // see CommCommonCbParams::syncWithComm }; +class SBuf; + // read/write (I/O) parameters class CommIoCbParams: public CommCommonCbParams { @@ -118,6 +120,7 @@ public: char *buf; size_t size; + SBuf *buf2; // alternative buffer for use when buf is unset }; // close parameters === modified file 'src/client_side.cc' --- src/client_side.cc 2014-03-08 17:28:23 +0000 +++ src/client_side.cc 2014-03-15 11:42:55 +0000 @@ -248,12 +248,12 @@ debugs(33, 4, HERE << clientConnection << ": reading request..."); - if (!maybeMakeSpaceAvailable()) + if (!in.maybeMakeSpaceAvailable()) return; typedef CommCbMemFunT Dialer; reader = JobCallback(33, 5, Dialer, this, ConnStateData::clientReadRequest); - comm_read(clientConnection, in.addressToReadInto(), getAvailableBufferLength(), reader); + comm_read(clientConnection, in.buf, reader); } void @@ -1561,7 +1561,7 @@ fd_note(clientConnection->fd, "Idle client: Waiting for next request"); /** - * Set the timeout BEFORE calling clientReadRequest(). + * Set the timeout BEFORE calling readSomeData(). */ typedef CommCbMemFunT TimeoutDialer; AsyncCall::Pointer timeoutCall = JobCallback(33, 5, @@ -1889,7 +1889,7 @@ if (!stoppedReceiving()) { if (const int64_t expecting = mayNeedToReadMoreBody()) { debugs(33, 5, HERE << "must still read " << expecting << - " request body bytes with " << in.notYetUsed << " unused"); + " request body bytes with " << in.buf.length() << " unused"); return; // wait for the request receiver to finish reading } } @@ -1956,7 +1956,7 @@ ClientSocketContext *context; StoreIOBuffer tempBuffer; http = new ClientHttpRequest(csd); - http->req_sz = csd->in.notYetUsed; + http->req_sz = csd->in.buf.length(); http->uri = xstrdup(uri); setLogUri (http, uri); context = new ClientSocketContext(csd->clientConnection, http); @@ -2379,32 +2379,20 @@ return result; } -int -ConnStateData::getAvailableBufferLength() const -{ - assert (in.allocatedSize > in.notYetUsed); // allocated more than used - const size_t result = in.allocatedSize - in.notYetUsed - 1; - // huge request_header_max_size may lead to more than INT_MAX unused space - assert (static_cast(result) <= INT_MAX); - return result; -} - bool -ConnStateData::maybeMakeSpaceAvailable() +ConnStateData::In::maybeMakeSpaceAvailable() { - if (getAvailableBufferLength() < 2) { - size_t newSize; - if (in.allocatedSize >= Config.maxRequestBufferSize) { + if (buf.spaceSize() < 2) { + const SBuf::size_type haveCapacity = buf.length() + buf.spaceSize(); + if (haveCapacity >= Config.maxRequestBufferSize) { debugs(33, 4, "request buffer full: client_request_buffer_max_size=" << Config.maxRequestBufferSize); return false; } - if ((newSize=in.allocatedSize * 2) > Config.maxRequestBufferSize) { - newSize=Config.maxRequestBufferSize; - } - in.buf = (char *)memReallocBuf(in.buf, newSize, &in.allocatedSize); - debugs(33, 2, "growing request buffer: notYetUsed=" << in.notYetUsed << " size=" << in.allocatedSize); + const SBuf::size_type wantCapacity = min(Config.maxRequestBufferSize, haveCapacity*2); + buf.reserveCapacity(wantCapacity); + debugs(33, 2, "growing request buffer: available=" << buf.spaceSize() << " used=" << buf.length()); } - return true; + return (buf.spaceSize() >= 2); } void @@ -2442,7 +2430,7 @@ if (!ignoreErrno(xerrno)) { debugs(33, 2, "connReadWasError: FD " << clientConnection << ": " << xstrerr(xerrno)); return 1; - } else if (in.notYetUsed == 0) { + } else if (in.buf.isEmpty()) { debugs(33, 2, "connReadWasError: FD " << clientConnection << ": no data to process (" << xstrerr(xerrno) << ")"); } } @@ -2454,7 +2442,7 @@ ConnStateData::connFinishedWithConn(int size) { if (size == 0) { - if (getConcurrentRequestCount() == 0 && in.notYetUsed == 0) { + if (getConcurrentRequestCount() == 0 && in.buf.isEmpty()) { /* no current or pending requests */ debugs(33, 4, HERE << clientConnection << " closed"); return 1; @@ -2472,26 +2460,19 @@ void connNoteUseOfBuffer(ConnStateData* conn, size_t byteCount) { - assert(byteCount > 0 && byteCount <= conn->in.notYetUsed); - conn->in.notYetUsed -= byteCount; - debugs(33, 5, HERE << "conn->in.notYetUsed = " << conn->in.notYetUsed); - /* - * If there is still data that will be used, - * move it to the beginning. - */ - - if (conn->in.notYetUsed > 0) - memmove(conn->in.buf, conn->in.buf + byteCount, conn->in.notYetUsed); + assert(byteCount > 0 && byteCount <= conn->in.buf.length()); + conn->in.buf.consume(byteCount); + debugs(33, 5, "conn->in.buf has " << conn->in.buf.length() << " bytes unused."); } /// respond with ERR_TOO_BIG if request header exceeds request_header_max_size void ConnStateData::checkHeaderLimits() { - if (in.notYetUsed < Config.maxRequestHeaderSize) + if (in.buf.length() < Config.maxRequestHeaderSize) return; // can accumulte more header data - debugs(33, 3, "Request header is too large (" << in.notYetUsed << " > " << + debugs(33, 3, "Request header is too large (" << in.buf.length() << " > " << Config.maxRequestHeaderSize << " bytes)"); ClientSocketContext *context = parseHttpRequestAbort(this, "error:request-too-large"); @@ -2642,15 +2623,15 @@ assert (repContext); switch (hp->request_parse_status) { case Http::scHeaderTooLarge: - repContext->setReplyToError(ERR_TOO_BIG, Http::scBadRequest, method, http->uri, conn->clientConnection->remote, NULL, conn->in.buf, NULL); + repContext->setReplyToError(ERR_TOO_BIG, Http::scBadRequest, method, http->uri, conn->clientConnection->remote, NULL, conn->in.buf.c_str(), NULL); break; case Http::scMethodNotAllowed: repContext->setReplyToError(ERR_UNSUP_REQ, Http::scMethodNotAllowed, method, http->uri, - conn->clientConnection->remote, NULL, conn->in.buf, NULL); + conn->clientConnection->remote, NULL, conn->in.buf.c_str(), NULL); break; default: repContext->setReplyToError(ERR_INVALID_REQ, hp->request_parse_status, method, http->uri, - conn->clientConnection->remote, NULL, conn->in.buf, NULL); + conn->clientConnection->remote, NULL, conn->in.buf.c_str(), NULL); } assert(context->http->out.offset == 0); context->pullData(); @@ -2900,9 +2881,9 @@ static void connStripBufferWhitespace (ConnStateData * conn) { - while (conn->in.notYetUsed > 0 && xisspace(conn->in.buf[0])) { - memmove(conn->in.buf, conn->in.buf + 1, conn->in.notYetUsed - 1); - -- conn->in.notYetUsed; + // XXX: kill this whole function. + while (!conn->in.buf.isEmpty() && xisspace(conn->in.buf.at(0))) { + conn->in.buf.consume(1); } } @@ -2945,24 +2926,20 @@ // Loop while we have read bytes that are not needed for producing the body // On errors, bodyPipe may become nil, but readMore will be cleared - while (in.notYetUsed > 0 && !bodyPipe && flags.readMore) { + while (!in.buf.isEmpty() && !bodyPipe && flags.readMore) { connStripBufferWhitespace(this); /* Don't try to parse if the buffer is empty */ - if (in.notYetUsed == 0) + if (in.buf.isEmpty()) break; /* Limit the number of concurrent requests */ if (concurrentRequestQueueFilled()) break; - /* Should not be needed anymore */ - /* Terminate the string */ - in.buf[in.notYetUsed] = '\0'; - /* Begin the parsing */ PROF_start(parseHttpRequest); - HttpParserInit(&parser_, in.buf, in.notYetUsed); + HttpParserInit(&parser_, in.buf.c_str(), in.buf.length()); /* Process request */ Http::ProtocolVersion http_ver; @@ -3034,7 +3011,7 @@ kb_incr(&(statCounter.client_http.kbytes_in), io.size); // may comm_close or setReplyToError - if (!handleReadData(io.buf, io.size)) + if (!handleReadData(io.buf2)) return; } else if (io.size == 0) { @@ -3095,16 +3072,9 @@ * \retval true we did not call comm_close or setReplyToError */ bool -ConnStateData::handleReadData(char *buf, size_t size) +ConnStateData::handleReadData(SBuf *buf) { - char *current_buf = in.addressToReadInto(); - - if (buf != current_buf) - memmove(current_buf, buf, size); - - in.notYetUsed += size; - - in.buf[in.notYetUsed] = '\0'; /* Terminate the string */ + assert(buf == &in.buf); // XXX: make this abort the transaction if this fails // if we are reading a body, stuff data into the body pipe if (bodyPipe != NULL) @@ -3133,7 +3103,7 @@ } } else { // identity encoding debugs(33,5, HERE << "handling plain request body for " << clientConnection); - putSize = bodyPipe->putMoreData(in.buf, in.notYetUsed); + putSize = bodyPipe->putMoreData(in.buf.c_str(), in.buf.length()); if (!bodyPipe->mayNeedMoreData()) { // BodyPipe will clear us automagically when we produced everything bodyPipe = NULL; @@ -3163,17 +3133,17 @@ err_type ConnStateData::handleChunkedRequestBody(size_t &putSize) { - debugs(33,7, HERE << "chunked from " << clientConnection << ": " << in.notYetUsed); + debugs(33, 7, "chunked from " << clientConnection << ": " << in.buf.length()); try { // the parser will throw on errors - if (!in.notYetUsed) // nothing to do (MemBuf::init requires this check) + if (in.buf.isEmpty()) // nothing to do return ERR_NONE; MemBuf raw; // ChunkedCodingParser only works with MemBufs // add one because MemBuf will assert if it cannot 0-terminate - raw.init(in.notYetUsed, in.notYetUsed+1); - raw.append(in.buf, in.notYetUsed); + raw.init(in.buf.length(), in.buf.length()+1); + raw.append(in.buf.c_str(), in.buf.length()); const mb_size_t wasContentSize = raw.contentSize(); BodyPipeCheckout bpc(*bodyPipe); @@ -3313,7 +3283,7 @@ log_addr = xact->tcpClient->remote; log_addr.applyMask(Config.Addrs.client_netmask); - in.buf = (char *)memAllocBuf(CLIENT_REQ_BUF_SZ, &in.allocatedSize); + in.buf.reserveCapacity(CLIENT_REQ_BUF_SZ); if (port->disable_pmtu_discovery != DISABLE_PMTU_OFF && (transparent() || port->disable_pmtu_discovery == DISABLE_PMTU_ALWAYS)) { @@ -3655,10 +3625,10 @@ // fake a CONNECT request to force connState to tunnel static char ip[MAX_IPSTRLEN]; - static char reqStr[MAX_IPSTRLEN + 80]; connState->clientConnection->local.toUrl(ip, sizeof(ip)); - snprintf(reqStr, sizeof(reqStr), "CONNECT %s HTTP/1.1\r\nHost: %s\r\n\r\n", ip, ip); - bool ret = connState->handleReadData(reqStr, strlen(reqStr)); + SBuf reqStr; + reqStr.append("CONNECT ").append(ip).append(" HTTP/1.1\r\nHost: ").append(ip).append("\r\n\r\n"); + bool ret = connState->handleReadData(&reqStr); if (ret) ret = connState->clientParseRequests(); @@ -4350,7 +4320,7 @@ return -1; // probably need to read more, but we cannot be sure const int64_t needToProduce = bodyPipe->unproducedSize(); - const int64_t haveAvailable = static_cast(in.notYetUsed); + const int64_t haveAvailable = static_cast(in.buf.length()); if (needToProduce <= haveAvailable) return 0; // we have read what we need (but are waiting for pipe space) @@ -4420,20 +4390,13 @@ in.bodyParser = NULL; } -char * -ConnStateData::In::addressToReadInto() const -{ - return buf + notYetUsed; -} - -ConnStateData::In::In() : bodyParser(NULL), - buf (NULL), notYetUsed (0), allocatedSize (0) +ConnStateData::In::In() : + bodyParser(NULL), + buf() {} ConnStateData::In::~In() { - if (allocatedSize) - memFreeBuf(allocatedSize, buf); delete bodyParser; // TODO: pool } === modified file 'src/client_side.h' --- src/client_side.h 2014-01-05 19:49:23 +0000 +++ src/client_side.h 2014-03-15 02:30:08 +0000 @@ -189,14 +189,12 @@ ~ConnStateData(); void readSomeData(); - int getAvailableBufferLength() const; bool areAllContextsForThisConnection() const; void freeAllContexts(); void notifyAllContexts(const int xerrno); ///< tell everybody about the err /// Traffic parsing bool clientParseRequests(); void readNextRequest(); - bool maybeMakeSpaceAvailable(); ClientSocketContext::Pointer getCurrentContext() const; void addContextToQueue(ClientSocketContext * context); int getConcurrentRequestCount() const; @@ -212,12 +210,10 @@ struct In { In(); ~In(); - char *addressToReadInto() const; + bool maybeMakeSpaceAvailable(); ChunkedCodingParser *bodyParser; ///< parses chunked request body - char *buf; - size_t notYetUsed; - size_t allocatedSize; + SBuf buf; } in; /** number of body bytes we need to comm_read for the "current" request @@ -293,7 +289,7 @@ virtual void noteMoreBodySpaceAvailable(BodyPipe::Pointer); virtual void noteBodyConsumerAborted(BodyPipe::Pointer); - bool handleReadData(char *buf, size_t size); + bool handleReadData(SBuf *buf); bool handleRequestBodyData(); /** === modified file 'src/comm.cc' --- src/comm.cc 2014-02-21 10:46:19 +0000 +++ src/comm.cc 2014-03-15 02:30:08 +0000 @@ -130,8 +130,19 @@ ++ statCounter.syscalls.sock.reads; errno = 0; int retval; - retval = FD_READ_METHOD(fd, ccb->buf, ccb->size); - debugs(5, 3, "comm_read_try: FD " << fd << ", size " << ccb->size << ", retval " << retval << ", errno " << errno); + if (ccb->buf) { + retval = FD_READ_METHOD(fd, ccb->buf, ccb->size); + debugs(5, 3, "char FD " << fd << ", size " << ccb->size << ", retval " << retval << ", errno " << errno); + } else { + assert(ccb->buf2 != NULL); + SBuf::size_type sz = ccb->buf2->spaceSize(); + char *buf = ccb->buf2->rawSpace(sz); + retval = FD_READ_METHOD(fd, buf, sz-1); // blocking synchronous read(2) + if (retval > 0) { + ccb->buf2->append(buf, retval); + } + debugs(5, 3, "SBuf FD " << fd << ", size " << sz << ", retval " << retval << ", errno " << errno); + } if (retval < 0 && !ignoreErrno(errno)) { debugs(5, 3, "comm_read_try: scheduling COMM_ERROR"); @@ -183,6 +194,36 @@ } /** + * Queue a read. handler/handler_data are called when the read + * completes, on error, or on file descriptor close. + */ +void +comm_read(const Comm::ConnectionPointer &conn, SBuf &buf, AsyncCall::Pointer &callback) +{ + debugs(5, 5, "comm_read, queueing read for " << conn << "; asynCall " << callback); + + /* Make sure we are open and not closing */ + assert(Comm::IsConnOpen(conn)); + assert(!fd_table[conn->fd].closing()); + Comm::IoCallback *ccb = COMMIO_FD_READCB(conn->fd); + + // Make sure we are either not reading or just passively monitoring. + // Active/passive conflicts are OK and simply cancel passive monitoring. + if (ccb->active()) { + // if the assertion below fails, we have an active comm_read conflict + assert(fd_table[conn->fd].halfClosedReader != NULL); + commStopHalfClosedMonitor(conn->fd); + assert(!ccb->active()); + } + ccb->conn = conn; + ccb->buf2 = &buf; + + /* Queue the read */ + ccb->setCallback(Comm::IOCB_READ, callback, NULL, NULL, buf.spaceSize()); + Comm::SetSelect(conn->fd, COMM_SELECT_READ, commHandleRead, ccb, 0); +} + +/** * Empty the read buffers * * This is a magical routine that empties the read buffers. === modified file 'src/comm.h' --- src/comm.h 2014-01-19 05:39:55 +0000 +++ src/comm.h 2014-03-15 02:30:08 +0000 @@ -79,8 +79,8 @@ int comm_has_pending_read_callback(int fd); bool comm_monitors_read(int fd); -//void comm_read(const Comm::ConnectionPointer &conn, char *buf, int len, IOCB *handler, void *data); void comm_read(const Comm::ConnectionPointer &conn, char *buf, int len, AsyncCall::Pointer &callback); +void comm_read(const Comm::ConnectionPointer &conn, SBuf &buf, AsyncCall::Pointer &callback); void comm_read_cancel(int fd, IOCB *callback, void *data); void comm_read_cancel(int fd, AsyncCall::Pointer &callback); int comm_udp_recvfrom(int fd, void *buf, size_t len, int flags, Ip::Address &from); === modified file 'src/comm/IoCallback.cc' --- src/comm/IoCallback.cc 2012-08-14 11:53:07 +0000 +++ src/comm/IoCallback.cc 2014-03-15 02:30:08 +0000 @@ -89,6 +89,7 @@ Comm::IoCallback::reset() { conn = NULL; + buf2 = NULL; // we do not own this buffer. if (freefunc) { freefunc(buf); buf = NULL; @@ -109,7 +110,7 @@ assert(active()); /* free data */ - if (freefunc) { + if (freefunc && buf) { freefunc(buf); buf = NULL; freefunc = NULL; @@ -120,6 +121,7 @@ Params ¶ms = GetCommParams(callback); if (conn != NULL) params.fd = conn->fd; // for legacy write handlers... params.conn = conn; + params.buf2 = buf2; params.buf = buf; params.size = offset; params.flag = code; === modified file 'src/comm/IoCallback.h' --- src/comm/IoCallback.h 2012-09-21 14:57:30 +0000 +++ src/comm/IoCallback.h 2014-03-15 11:42:55 +0000 @@ -6,6 +6,8 @@ #include "comm_err_t.h" #include "typedefs.h" +class SBuf; + namespace Comm { @@ -23,6 +25,14 @@ iocb_type type; Comm::ConnectionPointer conn; AsyncCall::Pointer callback; + + /// Buffer to store read(2) into when set. + // This is a pointer to the Jobs buffer rather than an SBuf using + // the same store since we cannot know when or how the Job will + // alter its SBuf while we are reading. + SBuf *buf2; + + // Legacy c-string buffers used when buf2 is unset. char *buf; FREE *freefunc; int size; === modified file 'src/stat.cc' --- src/stat.cc 2014-03-17 14:27:13 +0000 +++ src/stat.cc 2014-03-23 02:56:12 +0000 @@ -1880,8 +1880,8 @@ storeAppendPrintf(s, "\tFD %d, read %" PRId64 ", wrote %" PRId64 "\n", fd, fd_table[fd].bytes_read, fd_table[fd].bytes_written); storeAppendPrintf(s, "\tFD desc: %s\n", fd_table[fd].desc); - storeAppendPrintf(s, "\tin: buf %p, offset %ld, size %ld\n", - conn->in.buf, (long int) conn->in.notYetUsed, (long int) conn->in.allocatedSize); + storeAppendPrintf(s, "\tin: buf %p, used %ld, free %ld\n", + conn->in.buf.c_str(), (long int) conn->in.buf.length(), (long int) conn->in.buf.spaceSize()); storeAppendPrintf(s, "\tremote: %s\n", conn->clientConnection->remote.toUrl(buf,MAX_IPSTRLEN)); storeAppendPrintf(s, "\tlocal: %s\n", === modified file 'src/tests/stub_client_side.cc' --- src/tests/stub_client_side.cc 2014-01-05 19:49:23 +0000 +++ src/tests/stub_client_side.cc 2014-03-15 02:30:08 +0000 @@ -29,19 +29,16 @@ void ClientSocketContext::writeControlMsg(HttpControlMsg &msg) STUB void ConnStateData::readSomeData() STUB -int ConnStateData::getAvailableBufferLength() const STUB_RETVAL(0) bool ConnStateData::areAllContextsForThisConnection() const STUB_RETVAL(false) void ConnStateData::freeAllContexts() STUB void ConnStateData::notifyAllContexts(const int xerrno) STUB bool ConnStateData::clientParseRequests() STUB_RETVAL(false) void ConnStateData::readNextRequest() STUB -bool ConnStateData::maybeMakeSpaceAvailable() STUB_RETVAL(false) void ConnStateData::addContextToQueue(ClientSocketContext * context) STUB int ConnStateData::getConcurrentRequestCount() const STUB_RETVAL(0) bool ConnStateData::isOpen() const STUB_RETVAL(false) void ConnStateData::checkHeaderLimits() STUB void ConnStateData::sendControlMsg(HttpControlMsg msg) STUB -char *ConnStateData::In::addressToReadInto() const STUB_RETVAL(NULL) int64_t ConnStateData::mayNeedToReadMoreBody() const STUB_RETVAL(0) #if USE_AUTH void ConnStateData::setAuth(const Auth::UserRequest::Pointer &aur, const char *cause) STUB @@ -54,7 +51,7 @@ void ConnStateData::expectNoForwarding() STUB void ConnStateData::noteMoreBodySpaceAvailable(BodyPipe::Pointer) STUB void ConnStateData::noteBodyConsumerAborted(BodyPipe::Pointer) STUB -bool ConnStateData::handleReadData(char *buf, size_t size) STUB_RETVAL(false) +bool ConnStateData::handleReadData(SBuf *buf) STUB_RETVAL(false) bool ConnStateData::handleRequestBodyData() STUB_RETVAL(false) void ConnStateData::pinConnection(const Comm::ConnectionPointer &pinServerConn, HttpRequest *request, CachePeer *peer, bool auth) STUB void ConnStateData::unpinConnection() STUB @@ -76,6 +73,8 @@ bool ConnStateData::serveDelayedError(ClientSocketContext *context) STUB_RETVAL(false) #endif +bool ConnStateData::In::maybeMakeSpaceAvailable() STUB_RETVAL(false) + void setLogUri(ClientHttpRequest * http, char const *uri, bool cleanUrl) STUB const char *findTrailingHTTPVersion(const char *uriAndHTTPVersion, const char *end) STUB_RETVAL(NULL) int varyEvaluateMatch(StoreEntry * entry, HttpRequest * req) STUB_RETVAL(0)