/* * Copyright 2004 The WebRTC Project Authors. All rights reserved. * * Use of this source code is governed by a BSD-style license * that can be found in the LICENSE file in the root of the source * tree. An additional intellectual property rights grant can be found * in the file PATENTS. All contributing project authors may * be found in the AUTHORS file in the root of the source tree. */ #include #include "webrtc/base/asyncsocket.h" #include "webrtc/base/logging.h" #include "webrtc/base/socketfactory.h" #include "webrtc/base/socketpool.h" #include "webrtc/base/socketstream.h" #include "webrtc/base/thread.h" namespace rtc { /////////////////////////////////////////////////////////////////////////////// // StreamCache - Caches a set of open streams, defers creation to a separate // StreamPool. /////////////////////////////////////////////////////////////////////////////// StreamCache::StreamCache(StreamPool* pool) : pool_(pool) { } StreamCache::~StreamCache() { for (ConnectedList::iterator it = active_.begin(); it != active_.end(); ++it) { delete it->second; } for (ConnectedList::iterator it = cached_.begin(); it != cached_.end(); ++it) { delete it->second; } } StreamInterface* StreamCache::RequestConnectedStream( const SocketAddress& remote, int* err) { LOG_F(LS_VERBOSE) << "(" << remote << ")"; for (ConnectedList::iterator it = cached_.begin(); it != cached_.end(); ++it) { if (remote == it->first) { it->second->SignalEvent.disconnect(this); // Move from cached_ to active_ active_.push_front(*it); cached_.erase(it); if (err) *err = 0; LOG_F(LS_VERBOSE) << "Providing cached stream"; return active_.front().second; } } if (StreamInterface* stream = pool_->RequestConnectedStream(remote, err)) { // We track active streams so that we can remember their address active_.push_front(ConnectedStream(remote, stream)); LOG_F(LS_VERBOSE) << "Providing new stream"; return active_.front().second; } return NULL; } void StreamCache::ReturnConnectedStream(StreamInterface* stream) { for (ConnectedList::iterator it = active_.begin(); it != active_.end(); ++it) { if (stream == it->second) { LOG_F(LS_VERBOSE) << "(" << it->first << ")"; if (stream->GetState() == SS_CLOSED) { // Return closed streams LOG_F(LS_VERBOSE) << "Returning closed stream"; pool_->ReturnConnectedStream(it->second); } else { // Monitor open streams stream->SignalEvent.connect(this, &StreamCache::OnStreamEvent); LOG_F(LS_VERBOSE) << "Caching stream"; cached_.push_front(*it); } active_.erase(it); return; } } ASSERT(false); } void StreamCache::OnStreamEvent(StreamInterface* stream, int events, int err) { if ((events & SE_CLOSE) == 0) { LOG_F(LS_WARNING) << "(" << events << ", " << err << ") received non-close event"; return; } for (ConnectedList::iterator it = cached_.begin(); it != cached_.end(); ++it) { if (stream == it->second) { LOG_F(LS_VERBOSE) << "(" << it->first << ")"; // We don't cache closed streams, so return it. it->second->SignalEvent.disconnect(this); LOG_F(LS_VERBOSE) << "Returning closed stream"; pool_->ReturnConnectedStream(it->second); cached_.erase(it); return; } } ASSERT(false); } ////////////////////////////////////////////////////////////////////// // NewSocketPool ////////////////////////////////////////////////////////////////////// NewSocketPool::NewSocketPool(SocketFactory* factory) : factory_(factory) { } NewSocketPool::~NewSocketPool() { } StreamInterface* NewSocketPool::RequestConnectedStream(const SocketAddress& remote, int* err) { AsyncSocket* socket = factory_->CreateAsyncSocket(remote.family(), SOCK_STREAM); if (!socket) { if (err) *err = -1; return NULL; } if ((socket->Connect(remote) != 0) && !socket->IsBlocking()) { if (err) *err = socket->GetError(); delete socket; return NULL; } if (err) *err = 0; return new SocketStream(socket); } void NewSocketPool::ReturnConnectedStream(StreamInterface* stream) { Thread::Current()->Dispose(stream); } ////////////////////////////////////////////////////////////////////// // ReuseSocketPool ////////////////////////////////////////////////////////////////////// ReuseSocketPool::ReuseSocketPool(SocketFactory* factory) : factory_(factory), stream_(NULL), checked_out_(false) { } ReuseSocketPool::~ReuseSocketPool() { ASSERT(!checked_out_); delete stream_; } StreamInterface* ReuseSocketPool::RequestConnectedStream(const SocketAddress& remote, int* err) { // Only one socket can be used from this "pool" at a time ASSERT(!checked_out_); if (!stream_) { LOG_F(LS_VERBOSE) << "Creating new socket"; int family = remote.family(); // TODO: Deal with this when we/I clean up DNS resolution. if (remote.IsUnresolvedIP()) { family = AF_INET; } AsyncSocket* socket = factory_->CreateAsyncSocket(family, SOCK_STREAM); if (!socket) { if (err) *err = -1; return NULL; } stream_ = new SocketStream(socket); } if ((stream_->GetState() == SS_OPEN) && (remote == remote_)) { LOG_F(LS_VERBOSE) << "Reusing connection to: " << remote_; } else { remote_ = remote; stream_->Close(); if ((stream_->GetSocket()->Connect(remote_) != 0) && !stream_->GetSocket()->IsBlocking()) { if (err) *err = stream_->GetSocket()->GetError(); return NULL; } else { LOG_F(LS_VERBOSE) << "Opening connection to: " << remote_; } } stream_->SignalEvent.disconnect(this); checked_out_ = true; if (err) *err = 0; return stream_; } void ReuseSocketPool::ReturnConnectedStream(StreamInterface* stream) { ASSERT(stream == stream_); ASSERT(checked_out_); checked_out_ = false; // Until the socket is reused, monitor it to determine if it closes. stream_->SignalEvent.connect(this, &ReuseSocketPool::OnStreamEvent); } void ReuseSocketPool::OnStreamEvent(StreamInterface* stream, int events, int err) { ASSERT(stream == stream_); ASSERT(!checked_out_); // If the stream was written to and then immediately returned to us then // we may get a writable notification for it, which we should ignore. if (events == SE_WRITE) { LOG_F(LS_VERBOSE) << "Pooled Socket unexpectedly writable: ignoring"; return; } // If the peer sent data, we can't process it, so drop the connection. // If the socket has closed, clean it up. // In either case, we'll reconnect it the next time it is used. ASSERT(0 != (events & (SE_READ|SE_CLOSE))); if (0 != (events & SE_CLOSE)) { LOG_F(LS_VERBOSE) << "Connection closed with error: " << err; } else { LOG_F(LS_VERBOSE) << "Pooled Socket unexpectedly readable: closing"; } stream_->Close(); } /////////////////////////////////////////////////////////////////////////////// // LoggingPoolAdapter - Adapts a StreamPool to supply streams with attached // LoggingAdapters. /////////////////////////////////////////////////////////////////////////////// LoggingPoolAdapter::LoggingPoolAdapter( StreamPool* pool, LoggingSeverity level, const std::string& label, bool binary_mode) : pool_(pool), level_(level), label_(label), binary_mode_(binary_mode) { } LoggingPoolAdapter::~LoggingPoolAdapter() { for (StreamList::iterator it = recycle_bin_.begin(); it != recycle_bin_.end(); ++it) { delete *it; } } StreamInterface* LoggingPoolAdapter::RequestConnectedStream( const SocketAddress& remote, int* err) { if (StreamInterface* stream = pool_->RequestConnectedStream(remote, err)) { ASSERT(SS_CLOSED != stream->GetState()); std::stringstream ss; ss << label_ << "(0x" << std::setfill('0') << std::hex << std::setw(8) << stream << ")"; LOG_V(level_) << ss.str() << ((SS_OPEN == stream->GetState()) ? " Connected" : " Connecting") << " to " << remote; if (recycle_bin_.empty()) { return new LoggingAdapter(stream, level_, ss.str(), binary_mode_); } LoggingAdapter* logging = recycle_bin_.front(); recycle_bin_.pop_front(); logging->set_label(ss.str()); logging->Attach(stream); return logging; } return NULL; } void LoggingPoolAdapter::ReturnConnectedStream(StreamInterface* stream) { LoggingAdapter* logging = static_cast(stream); pool_->ReturnConnectedStream(logging->Detach()); recycle_bin_.push_back(logging); } /////////////////////////////////////////////////////////////////////////////// } // namespace rtc