Logo Search packages:      
Sourcecode: vdr-plugin-live version File versions  Download package

job.cpp

/* job.cpp
 * Copyright (C) 2003-2005 Tommi Maekitalo
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License as
 * published by the Free Software Foundation; either version 2 of the
 * License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but
 * is provided AS IS, WITHOUT ANY WARRANTY; without even the implied
 * warranty of MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, and
 * NON-INFRINGEMENT.  See the GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
 *
 */

#include "tnt/job.h"
#include <tnt/httpreply.h>
#include <cxxtools/log.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>

log_define("tntnet.job")

namespace tnt
{
  unsigned Job::socket_read_timeout = 200;
  unsigned Job::socket_write_timeout = 10000;
  unsigned Job::keepalive_max = 1000;
  unsigned Job::socket_buffer_size = 16384;

  Job::~Job()
  { }

  void Job::clear()
  {
    parser.reset();
    request.clear();
    touch();
  }

  int Job::msecToTimeout(time_t currentTime) const
  {
    return (lastAccessTime - currentTime + 1) * 1000
         + getKeepAliveTimeout()
         - getSocketReadTimeout();
  }

  unsigned Job::getKeepAliveTimeout()
  {
    return HttpReply::getKeepAliveTimeout();
  }

  ////////////////////////////////////////////////////////////////////////
  // Tcpjob
  //
  void Tcpjob::accept(const cxxtools::net::Server& listener)
  {
    log_debug("accept");
    socket.accept(listener);

    struct sockaddr_storage s = socket.getSockAddr();
    struct sockaddr_storage sockaddr;
    memcpy(&sockaddr, &s, sizeof(sockaddr));

    char buffer[INET6_ADDRSTRLEN];
    log_debug("connection accepted from "
      << inet_ntop(AF_INET6, &(socket.getPeeraddr()), buffer, sizeof(buffer)));

    getRequest().setPeerAddr(socket.getPeeraddr());
    getRequest().setServerAddr(sockaddr);
    getRequest().setSsl(false);
  }

  std::iostream& Tcpjob::getStream()
  {
    return socket;
  }

  int Tcpjob::getFd() const
  {
    return socket.getFd();
  }

  void Tcpjob::setRead()
  {
    socket.setTimeout(getSocketReadTimeout());
  }

  void Tcpjob::setWrite()
  {
    socket.setTimeout(getSocketWriteTimeout());
  }

#ifdef USE_SSL
  ////////////////////////////////////////////////////////////////////////
  // SslTcpjob
  //
  void SslTcpjob::accept(const SslServer& listener)
  {
    log_debug("accept (ssl)");
    socket.accept(listener);
    log_debug("connection accepted (ssl)");

    struct sockaddr_storage s = socket.getSockAddr();
    struct sockaddr_storage sockaddr;
    memcpy(&sockaddr, &s, sizeof(sockaddr));

    getRequest().setPeerAddr(socket.getPeeraddr());
    getRequest().setServerAddr(sockaddr);
    getRequest().setSsl(true);

    setRead();
  }

  std::iostream& SslTcpjob::getStream()
  {
    return socket;
  }

  int SslTcpjob::getFd() const
  {
    return socket.getFd();
  }

  void SslTcpjob::setRead()
  {
    socket.setTimeout(getSocketReadTimeout());
  }

  void SslTcpjob::setWrite()
  {
    socket.setTimeout(getSocketWriteTimeout());
  }

#endif // USE_SSL

#ifdef USE_GNUTLS
  ////////////////////////////////////////////////////////////////////////
  // GnuTlsTcpjob
  //
  void GnuTlsTcpjob::accept(const GnuTlsServer& listener)
  {
    log_debug("accept (ssl)");
    socket.accept(listener);
    log_debug("connection accepted (ssl)");

    struct sockaddr_storage s = socket.getSockAddr();
    struct sockaddr_storage sockaddr;
    memcpy(&sockaddr, &s, sizeof(sockaddr));

    getRequest().setPeerAddr(socket.getPeeraddr());
    getRequest().setServerAddr(sockaddr);
    getRequest().setSsl(true);

    setRead();
  }

  std::iostream& GnuTlsTcpjob::getStream()
  {
    return socket;
  }

  int GnuTlsTcpjob::getFd() const
  {
    return socket.getFd();
  }

  void GnuTlsTcpjob::setRead()
  {
    socket.setTimeout(getSocketReadTimeout());
  }

  void GnuTlsTcpjob::setWrite()
  {
    socket.setTimeout(getSocketWriteTimeout());
  }

#endif // USE_GNUTLS

  //////////////////////////////////////////////////////////////////////
  // Jobqueue
  //
  void Jobqueue::put(JobPtr j)
  {
    log_debug("Jobqueue::put");
    j->touch();

    cxxtools::MutexLock lock(mutex);

    if (capacity > 0)
    {
      while (jobs.size() >= capacity)
      {
        log_warn("Jobqueue full");
        notFull.wait(lock);
      }
    }

    jobs.push_back(j);

    if (waitThreads == 0)
    {
      log_info("no waiting threads left");
      noWaitThreads.signal();
    }

    notEmpty.signal();
  }

  Jobqueue::JobPtr Jobqueue::get()
  {
    // wait, until a job is available
    ++waitThreads;

    cxxtools::MutexLock lock(mutex);
    while (jobs.empty())
      notEmpty.wait(lock);

    --waitThreads;

    log_debug("Jobqueue: fetch job " << waitThreads << " waiting threads left");

    // take next job (queue is locked)
    JobPtr j = jobs.front();
    jobs.pop_front();

    // if there are more jobs, wake onther thread
    if (!jobs.empty())
      notEmpty.signal();
    notFull.signal();

    return j;
  }

}

Generated by  Doxygen 1.6.0   Back to index