------------------------------------------------------------ revno: 13924 revision-id: squid3@treenet.co.nz-20150927000303-cdbt2mg2uleslxmy parent: squid3@treenet.co.nz-20150927000109-d1v9jneh61kuetep author: Alex Rousskov committer: Amos Jeffries branch nick: 3.5 timestamp: Sat 2015-09-26 17:03:03 -0700 message: SMP: register worker listening ports one by one When operating with many listening ports workers can flood the UDS queue buffers and run into a timeout waiting for the coordinator to respond. To prevent that we for a queue and wait for each port to get a response before registering the next. ------------------------------------------------------------ # Bazaar merge directive format 2 (Bazaar 0.90) # revision_id: squid3@treenet.co.nz-20150927000303-cdbt2mg2uleslxmy # target_branch: http://bzr.squid-cache.org/bzr/squid3/3.5 # testament_sha1: 45ffa85f52201eab0ea68e29dfa8126b754d3e55 # timestamp: 2015-09-27 00:15:11 +0000 # source_branch: http://bzr.squid-cache.org/bzr/squid3/3.5 # base_revision_id: squid3@treenet.co.nz-20150927000109-\ # d1v9jneh61kuetep # # Begin patch === modified file 'src/ipc/SharedListen.cc' --- src/ipc/SharedListen.cc 2015-01-13 09:13:49 +0000 +++ src/ipc/SharedListen.cc 2015-09-27 00:03:03 +0000 @@ -21,6 +21,7 @@ #include "ipc/TypedMsgHdr.h" #include "tools.h" +#include #include /// holds information necessary to handle JoinListen response @@ -35,6 +36,10 @@ typedef std::map SharedListenRequestMap; static SharedListenRequestMap TheSharedListenRequestMap; +/// accumulates delayed requests until they are ready to be sent, in FIFO order +typedef std::list DelayedSharedListenRequests; +static DelayedSharedListenRequests TheDelayedRequests; + static int AddToMap(const PendingOpenRequest &por) { @@ -106,31 +111,59 @@ hdrMsg.putFd(fd); } -void Ipc::JoinSharedListen(const OpenListenerParams ¶ms, - AsyncCall::Pointer &callback) +static void +SendSharedListenRequest(const PendingOpenRequest &por) { - PendingOpenRequest por; - por.params = params; - por.callback = callback; - - SharedListenRequest request; + Ipc::SharedListenRequest request; request.requestorId = KidIdentifier; request.params = por.params; request.mapId = AddToMap(por); - debugs(54, 3, HERE << "getting listening FD for " << request.params.addr << + debugs(54, 3, "getting listening FD for " << request.params.addr << " mapId=" << request.mapId); - TypedMsgHdr message; + Ipc::TypedMsgHdr message; request.pack(message); SendMessage(Ipc::Port::CoordinatorAddr(), message); } +static void +kickDelayedRequest() +{ + if (TheDelayedRequests.empty()) + return; // no pending requests to resume + + debugs(54, 3, "resuming with " << TheSharedListenRequestMap.size() << + " active + " << TheDelayedRequests.size() << " delayed requests"); + + SendSharedListenRequest(*TheDelayedRequests.begin()); + TheDelayedRequests.pop_front(); +} + +void +Ipc::JoinSharedListen(const OpenListenerParams ¶ms, AsyncCall::Pointer &cb) +{ + PendingOpenRequest por; + por.params = params; + por.callback = cb; + + const DelayedSharedListenRequests::size_type concurrencyLimit = 1; + if (TheSharedListenRequestMap.size() >= concurrencyLimit) { + debugs(54, 3, "waiting for " << TheSharedListenRequestMap.size() << + " active + " << TheDelayedRequests.size() << " delayed requests"); + TheDelayedRequests.push_back(por); + } else { + SendSharedListenRequest(por); + } +} + void Ipc::SharedListenJoined(const SharedListenResponse &response) { // Dont debugs c fully since only FD is filled right now. - debugs(54, 3, HERE << "got listening FD " << response.fd << " errNo=" << - response.errNo << " mapId=" << response.mapId); + debugs(54, 3, "got listening FD " << response.fd << " errNo=" << + response.errNo << " mapId=" << response.mapId << " with " << + TheSharedListenRequestMap.size() << " active + " << + TheDelayedRequests.size() << " delayed requests"); Must(TheSharedListenRequestMap.find(response.mapId) != TheSharedListenRequestMap.end()); PendingOpenRequest por = TheSharedListenRequestMap[response.mapId]; @@ -158,5 +191,7 @@ cbd->errNo = response.errNo; cbd->handlerSubscription = por.params.handlerSubscription; ScheduleCallHere(por.callback); + + kickDelayedRequest(); }