!C99Shell v. 2.0 [PHP 7 Update] [25.02.2019]!

Software: Apache. PHP/5.6.40 

uname -a: Linux cpanel06wh.bkk1.cloud.z.com 2.6.32-954.3.5.lve1.4.80.el6.x86_64 #1 SMP Thu Sep 24
01:42:00 EDT 2020 x86_64
 

uid=851(cp949260) gid=853(cp949260) groups=853(cp949260) 

Safe-mode: OFF (not secure)

/opt/passenger-5.3.7-4.el6.cloudlinux/src/cxx_supportlib/ServerKit/   drwxr-xr-x
Free 234.17 GB of 981.82 GB (23.85%)
Home    Back    Forward    UPDIR    Refresh    Search    Buffer    Encoder    Tools    Proc.    FTP brute    Sec.    SQL    PHP-code    Update    Feedback    Self remove    Logout    


Viewing file:     AcceptLoadBalancer.h (8.88 KB)      -rw-r--r--
Select action/file-type:
(+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
/*
 *  Phusion Passenger - https://www.phusionpassenger.com/
 *  Copyright (c) 2014-2018 Phusion Holding B.V.
 *
 *  "Passenger", "Phusion Passenger" and "Union Station" are registered
 *  trademarks of Phusion Holding B.V.
 *
 *  Permission is hereby granted, free of charge, to any person obtaining a copy
 *  of this software and associated documentation files (the "Software"), to deal
 *  in the Software without restriction, including without limitation the rights
 *  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 *  copies of the Software, and to permit persons to whom the Software is
 *  furnished to do so, subject to the following conditions:
 *
 *  The above copyright notice and this permission notice shall be included in
 *  all copies or substantial portions of the Software.
 *
 *  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 *  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 *  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 *  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 *  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 *  OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 *  THE SOFTWARE.
 */
#ifndef _PASSENGER_SERVER_KIT_ACCEPT_LOAD_BALANCER_H_
#define _PASSENGER_SERVER_KIT_ACCEPT_LOAD_BALANCER_H_

#include <boost/bind.hpp>
#include <boost/cstdint.hpp>
#include <oxt/thread.hpp>
#include <oxt/macros.hpp>
#include <vector>
#include <cassert>
#include <cerrno>

#include <sys/socket.h>
#include <sys/un.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <poll.h>

#include <Constants.h>
#include <LoggingKit/LoggingKit.h>
#include <Utils.h>
#include <IOTools/IOUtils.h>

namespace Passenger {
namespace ServerKit {

using namespace std;
using namespace boost;


/**
 * Listens for client connections and load balances them to multiple
 * Server objects in a round-robin manner.
 *
 * Normally, the Server class listens for client connections directly.
 * But this is inefficient in multithreaded situations where you are
 * running one Server and event loop per CPU core, that all happen to
 * listen on the same server socket. This is because every time a client
 * connects, all threads wake up, but only one thread will succeed in
 * accept()ing the client.
 *
 * Furthermore, it can also be very easy for threads to become
 * unbalanced. If a burst of clients connect to the server socket,
 * then it is very likely that a single Server accepts all of
 * those clients. This can result in situations where, for example,
 * thread 1 has 40 clients and thread 2 has only 3.
 *
 * The AcceptLoadBalancer solves this problem by being the sole entity
 * that listens on the server socket. All client sockets that it
 * accepts are distributed to all registered Server objects, in a
 * round-robin manner.
 *
 * Inside the "PassengerAgent core", we activate AcceptLoadBalancer
 * only if `core_threads > 1`, which is often the case because
 * `core_threads` defaults to the number of CPU cores.
 */
template<typename Server>
class AcceptLoadBalancer {
private:
    static const unsigned int ACCEPT_BURST_COUNT = 16;

    int endpoints[SERVER_KIT_MAX_SERVER_ENDPOINTS];
    struct pollfd pollers[1 + SERVER_KIT_MAX_SERVER_ENDPOINTS];
    int newClients[ACCEPT_BURST_COUNT];

    unsigned int nEndpoints;
    boost::uint8_t newClientCount;
    boost::uint8_t nextServer;
    bool accept4Available;
    bool quit;

    int exitPipe[2];
    oxt::thread *thread;

    void pollAllEndpoints() {
        pollers[0].fd = exitPipe[0];
        pollers[0].events = POLLIN;
        for (unsigned int i = 0; i < nEndpoints; i++) {
            pollers[i + 1].fd = endpoints[i];
            pollers[i + 1].events = POLLIN;
        }

        int ret;
        do {
            ret = poll(pollers, nEndpoints + 1, -1) == -1;
        } while (ret == -1 && pollErrnoShouldRetry(errno));
        if (ret == -1) {
            int e = errno;
            throw SystemException("poll() failed", e);
        }
    }

    bool pollErrnoShouldRetry(int e) const {
        return e == EAGAIN
            || e == EWOULDBLOCK
            || e == EINTR;
    }

    bool acceptNewClients(int endpoint) {
        bool error = false;
        int fd, errcode = 0;

        while (newClientCount < ACCEPT_BURST_COUNT) {
            fd = acceptNonBlockingSocket(endpoint);
            if (fd == -1) {
                error = true;
                errcode = errno;
                break;
            }

            P_TRACE(2, "Accepted client file descriptor: " << fd);
            newClients[newClientCount] = fd;
            newClientCount++;
        }

        if (error && errcode != EAGAIN && errcode != EWOULDBLOCK) {
            P_ERROR("Cannot accept client: " << getErrorDesc(errcode) <<
                " (errno=" << errcode << "). " <<
                "Stop accepting clients for 3 seconds.");
            pollers[0].fd = exitPipe[0];
            pollers[0].events = POLLIN;
            if (poll(pollers, 1, 3000) == 1) {
                quit = true;
            } else {
                P_NOTICE("Resuming accepting new clients");
            }
            return false;
        } else {
            return true;
        }
    }

    void distributeNewClients() {
        unsigned int i;

        for (i = 0; i < newClientCount; i++) {
            ServerKit::Context *ctx = servers[nextServer]->getContext();
            P_TRACE(2, "Feeding client to server thread " << (int) nextServer <<
                ": file descriptor " << newClients[i]);
            ctx->libev->runLater(boost::bind(feedNewClient, servers[nextServer],
                newClients[i]));
            nextServer = (nextServer + 1) % servers.size();
        }

        newClientCount = 0;
    }

    static void feedNewClient(Server *server, int fd) {
        server->feedNewClients(&fd, 1);
    }

    int acceptNonBlockingSocket(int serverFd) {
        union {
            struct sockaddr_in inaddr;
            struct sockaddr_in6 inaddr6;
            struct sockaddr_un unaddr;
        } u;
        socklen_t addrlen = sizeof(u);

        if (accept4Available) {
            int fd = callAccept4(serverFd,
                (struct sockaddr *) &u,
                &addrlen,
                O_NONBLOCK);
            // FreeBSD returns EINVAL if accept4() is called with invalid flags.
            if (fd == -1 && (errno == ENOSYS || errno == EINVAL)) {
                accept4Available = false;
                return acceptNonBlockingSocket(serverFd);
            } else {
                P_LOG_FILE_DESCRIPTOR_OPEN(fd);
                return fd;
            }
        } else {
            int fd = syscalls::accept(serverFd,
                (struct sockaddr *) &u,
                &addrlen);
            FdGuard guard(fd, __FILE__, __LINE__);
            if (fd == -1) {
                return -1;
            } else {
                try {
                    setNonBlocking(fd);
                } catch (const SystemException &e) {
                    P_DEBUG("Unable to set non-blocking flag on accepted client socket: " <<
                        e.what() << " (errno=" << e.code() << ")");
                    errno = e.code();
                    return -1;
                }
                guard.clear();
                return fd;
            }
        }
    }

    void mainLoop() {
        while (!quit) {
            pollAllEndpoints();
            if (OXT_UNLIKELY(pollers[0].revents & POLLIN)) {
                // Exit pipe signaled.
                quit = true;
                break;
            }

            unsigned int i = 0;
            newClientCount = 0;

            while (newClientCount < ACCEPT_BURST_COUNT && i < nEndpoints) {
                if (pollers[i + 1].revents & POLLIN) {
                    if (!acceptNewClients(endpoints[i])) {
                        break;
                    }
                }
                i++;
            }

            distributeNewClients();
        }
    }

public:
    vector<Server *> servers;

    AcceptLoadBalancer()
        : nEndpoints(0),
          newClientCount(0),
          nextServer(0),
          accept4Available(true),
          quit(false),
          thread(NULL)
    {
        if (pipe(exitPipe) == -1) {
            int e = errno;
            throw SystemException("Cannot create pipe", e);
        }
        FdGuard guard1(exitPipe[0], __FILE__, __LINE__);
        FdGuard guard2(exitPipe[1], __FILE__, __LINE__);
        P_LOG_FILE_DESCRIPTOR_PURPOSE(exitPipe[0], "AcceptLoadBalancer: exitPipe[0]");
        P_LOG_FILE_DESCRIPTOR_PURPOSE(exitPipe[1], "AcceptLoadBalancer: exitPipe[1]");
        setNonBlocking(exitPipe[0]);
        setNonBlocking(exitPipe[1]);
        guard1.clear();
        guard2.clear();
    }

    ~AcceptLoadBalancer() {
        shutdown();
        close(exitPipe[0]);
        close(exitPipe[1]);
        P_LOG_FILE_DESCRIPTOR_CLOSE(exitPipe[0]);
        P_LOG_FILE_DESCRIPTOR_CLOSE(exitPipe[1]);
    }

    void listen(int fd) {
        #ifdef EOPNOTSUPP
            #define EXTENSION_EOPNOTSUPP EOPNOTSUPP
        #else
            #define EXTENSION_EOPNOTSUPP ENOTSUP
        #endif

        assert(nEndpoints < SERVER_KIT_MAX_SERVER_ENDPOINTS);
        setNonBlocking(fd);
        int flag = 1;
        if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(int)) == -1
         && errno != ENOPROTOOPT
         && errno != ENOTSUP
         && errno != EXTENSION_EOPNOTSUPP)
        {
            int e = errno;
            P_WARN("Cannot disable Nagle's algorithm on a TCP socket: " <<
                strerror(e) << " (errno=" << e << ")");
        }
        endpoints[nEndpoints] = fd;
        nEndpoints++;

        #undef EXTENSION_EOPNOTSUPP
    }

    void start() {
        boost::function<void ()> func = boost::bind(&AcceptLoadBalancer<Server>::mainLoop, this);
        thread = new oxt::thread(boost::bind(runAndPrintExceptions, func, true),
            "Load balancer");
    }

    void shutdown() {
        if (thread != NULL) {
            if (write(exitPipe[1], "x", 1) == -1) {
                if (errno != EAGAIN && errno != EWOULDBLOCK) {
                    int e = errno;
                    P_WARN("Cannot write to the load balancer's exit pipe: " <<
                        strerror(e) << " (errno=" << e << ")");
                }
            }
            thread->join();
            delete thread;
            thread = NULL;
        }
    }
};


} // namespace ServerKit
} // namespace Passenger

#endif /* _PASSENGER_SERVER_KIT_ACCEPT_LOAD_BALANCER_H_ */

:: Command execute ::

Enter:
 
Select:
 

:: Search ::
  - regexp 

:: Upload ::
 
[ Read-Only ]

:: Make Dir ::
 
[ Read-Only ]
:: Make File ::
 
[ Read-Only ]

:: Go Dir ::
 
:: Go File ::
 

--[ c99shell v. 2.0 [PHP 7 Update] [25.02.2019] maintained by KaizenLouie | C99Shell Github | Generation time: 0.0171 ]--