Vault  4.1
vsocket.cpp
Go to the documentation of this file.
00001 /*
00002 Copyright c1997-2014 Trygve Isaacson. All rights reserved.
00003 This file is part of the Code Vault version 4.1
00004 http://www.bombaydigital.com/
00005 License: MIT. See LICENSE.md in the Vault top level directory.
00006 */
00007 
00010 #include "vsocket.h"
00011 #include "vtypes_internal.h"
00012 
00013 #include "vexception.h"
00014 #include "vmutexlocker.h"
00015 
00016 V_STATIC_INIT_TRACE
00017 
00018 // This is to force our _platform_staticInit to be called at startup.
00019 bool VSocket::gStaticInited = VSocket::_platform_staticInit();
00020 
00021 // VSocket ----------------------------------------------------------------
00022 
00023 VString VSocket::gPreferredNetworkInterfaceName("en0");
00024 VString VSocket::gPreferredLocalIPAddressPrefix;
00025 VString VSocket::gCachedLocalHostIPAddress;
00026 
00027 // static
00028 void VSocket::setPreferredNetworkInterface(const VString& interfaceName) {
00029     gPreferredNetworkInterfaceName = interfaceName;
00030 }
00031 
00032 // static
00033 void VSocket::setPreferredLocalIPAddressPrefix(const VString& addressPrefix) {
00034     gPreferredLocalIPAddressPrefix = addressPrefix;
00035 }
00036 
00037 // static
00038 void VSocket::getLocalHostIPAddress(VString& ipAddress, bool refresh) {
00039     if (refresh || gCachedLocalHostIPAddress.isEmpty()) {
00040         VNetworkInterfaceList interfaces = VSocket::enumerateNetworkInterfaces();
00041         for (VNetworkInterfaceList::const_iterator i = interfaces.begin(); i != interfaces.end(); ++i) {
00042             // We want the first interface, but we keep going and use the preferred one if found.
00043             if ((i == interfaces.begin()) || ((*i).mName == gPreferredNetworkInterfaceName) || ((*i).mAddress.startsWith(gPreferredLocalIPAddressPrefix))) {
00044                 gCachedLocalHostIPAddress = (*i).mAddress;
00045 
00046                 // Break out of search if reason is that we found a preferred address.
00047                 if (((*i).mName == gPreferredNetworkInterfaceName) || ((*i).mAddress.startsWith(gPreferredLocalIPAddressPrefix))) {
00048                     break;
00049                 }
00050             }
00051         }
00052     }
00053 
00054     ipAddress = gCachedLocalHostIPAddress;
00055 }
00056 
00057 // static
00058 VNetAddr VSocket::ipAddressStringToNetAddr(const VString& ipAddress) {
00059     in_addr_t addr = ::inet_addr(ipAddress);
00060     return (VNetAddr) addr;
00061 }
00062 
00063 // static
00064 void VSocket::netAddrToIPAddressString(VNetAddr netAddr, VString& ipAddress) {
00065     in_addr addr;
00066 
00067     addr.s_addr = (in_addr_t) netAddr;
00068 
00069     ipAddress.copyFromCString(::inet_ntoa(addr));
00070 }
00071 
00072 class AddrInfoLifeCycleHelper {
00073     public:
00074         AddrInfoLifeCycleHelper() : mInfo(NULL) {}
00075         ~AddrInfoLifeCycleHelper() { ::freeaddrinfo(mInfo); }
00076         struct addrinfo* mInfo;
00077 };
00078 
00079 class AddrInfoHintsHelper {
00080     public:
00081         AddrInfoHintsHelper(int family, int socktype, int flags, int protocol) : mHints() {
00082             ::memset(&mHints, 0, sizeof(mHints));
00083             mHints.ai_family = family;
00084             mHints.ai_socktype = socktype;
00085             mHints.ai_flags = flags;
00086             mHints.ai_protocol = protocol;
00087         }
00088         ~AddrInfoHintsHelper() {}
00089         struct addrinfo mHints;
00090 };
00091 
00092 // static
00093 VStringVector VSocket::resolveHostName(const VString& hostName) {
00094     VStringVector resolvedAddresses;
00095 
00096     AddrInfoHintsHelper     hints(AF_UNSPEC, SOCK_STREAM, 0, 0); // accept IPv4 or IPv6, we'll skip any others on receipt; stream connections only, not udp.
00097     AddrInfoLifeCycleHelper info;
00098     int result = ::getaddrinfo(hostName.chars(), NULL, &hints.mHints, &info.mInfo); // TODO: iOS solution. If WWAN is asleep, calling getaddrinfo() in isolation may return an error. See CFHost API.
00099 
00100     if (result == 0) {
00101         for (const struct addrinfo* item = info.mInfo; item != NULL; item = item->ai_next) {
00102             if ((item->ai_family == AF_INET) || (item->ai_family == AF_INET6)) {
00103                 resolvedAddresses.push_back(VSocket::_platform_addrinfoToIPAddressString(hostName, item));
00104             }
00105         }
00106     }
00107 
00108     if (result != 0) {
00109         throw VException(VSystemError::getSocketError(), VSTRING_FORMAT("VSocket::resolveHostName(%s): getaddrinfo returned %d.", hostName.chars(), result));
00110     }
00111 
00112     if (resolvedAddresses.empty()) {
00113         throw VException(VSTRING_FORMAT("VSocket::resolveHostName(%s): getaddrinfo did not resolve any addresses.", hostName.chars()));
00114     }
00115 
00116     return resolvedAddresses;
00117 }
00118 
00119 /*
00120 This is a somewhat cursory check. The exact sequence and order of dots and decimals is not verified.
00121 */
00122 // static
00123 bool VSocket::isIPv4NumericString(const VString& s) {
00124     int numDots = 0;
00125     int numDecimalDigits = 0;
00126 
00127     for (int i = 0; i < s.length(); ++i) {
00128         if (s[i] == '.') {
00129             ++numDots;
00130             continue;
00131         }
00132 
00133         if (s[i].isNumeric()) {
00134             ++numDecimalDigits;
00135             continue;
00136         }
00137 
00138         return false; // Some other character that is not part of a numeric IPv4 address.
00139     }
00140 
00141     // A cursory check of minimum number of dots and digits. Order is not checked.
00142     return (numDots == 3) && (numDecimalDigits >= 4);
00143 }
00144 
00145 /*
00146 There are lots of different forms possible. See RFC 2373.
00147 We know there must be colons present, at least two.
00148 The shortest possible value is "::".
00149 There are usually several hexadecimal segments separated by colons.
00150 There may also be dotted decimal (IPv4) elements at the end.
00151 So we check that every character is a colon, a dot, or a hexadecimal.
00152 And there must be two colons, so an explicit minimum length of 2 test is superfluous.
00153 */
00154 // static
00155 bool VSocket::isIPv6NumericString(const VString& s) {
00156     int numColons = 0;
00157 
00158     for (int i = 0; i < s.length(); ++i) {
00159         if (!((s[i] == ':') || (s[i] == '.') || s[i].isHexadecimal())) {
00160             return false;
00161         }
00162 
00163         if (s[i] == ':') {
00164             ++numColons;
00165         }
00166     }
00167 
00168     return (numColons >= 2); // The shortest possible IPv6 string is "::".
00169 }
00170 
00171 /*
00172 Scan the string once, looking for signs that it's neither an IPv4 nor IPv6 numeric address.
00173 If checking for either, this is faster than checking separately.
00174 */
00175 // static
00176 bool VSocket::isIPNumericString(const VString& s) {
00177     int numColons = 0;
00178     int numDots = 0;
00179     int numDecimalDigits = 0;
00180     int numNonDecimalHexDigits = 0;
00181 
00182     for (int i = 0; i < s.length(); ++i) {
00183         if (s[i] == ':') {
00184             ++numColons;
00185             continue;
00186         }
00187 
00188         if (s[i] == '.') {
00189             ++numDots;
00190             continue;
00191         }
00192 
00193         if (s[i].isNumeric()) {
00194             ++numDecimalDigits;
00195             continue;
00196         }
00197 
00198         if (s[i].isHexadecimal()) {
00199             ++numNonDecimalHexDigits;
00200             continue;
00201         }
00202 
00203         return false; // Some other character that is not part of a numeric IPv4 or IPv6 address.
00204     }
00205 
00206     // If we saw no colons (i.e., it's IPv4 dotted decimal) then there must be no A-F hex digits.
00207     if ((numColons == 0) && (numNonDecimalHexDigits != 0)) {
00208         return false;
00209     }
00210 
00211     // If we saw colons, it's IPv6 and the minimum is two colons.
00212     if (numColons != 0) {
00213         return (numColons >= 2); // The shortest possible IPv6 string is "::".
00214     }
00215 
00216     // We saw no colons, so the address should be IPv4. Cursory length check as in isIPv4NumericString().
00217     return (numDots == 3) && (numDecimalDigits >= 4); // A minimum of 4 digits separated by dots: "1.2.3.4"
00218 }
00219 
00220 VSocket::VSocket()
00221     : mSocketID(kNoSocketID)
00222     , mHostIPAddress()
00223     , mPortNumber(0)
00224     , mReadTimeOutActive(false)
00225     , mReadTimeOut()
00226     , mWriteTimeOutActive(false)
00227     , mWriteTimeOut()
00228     , mRequireReadAll(true)
00229     , mNumBytesRead(0)
00230     , mNumBytesWritten(0)
00231     , mLastEventTime()
00232     , mSocketName()
00233     {
00234 }
00235 
00236 VSocket::VSocket(VSocketID id)
00237     : mSocketID(id)
00238     , mHostIPAddress()
00239     , mPortNumber(0)
00240     , mReadTimeOutActive(false)
00241     , mReadTimeOut()
00242     , mWriteTimeOutActive(false)
00243     , mWriteTimeOut()
00244     , mRequireReadAll(true)
00245     , mNumBytesRead(0)
00246     , mNumBytesWritten(0)
00247     , mLastEventTime()
00248     , mSocketName()
00249     {
00250 }
00251 
00252 VSocket::~VSocket() {
00253     this->close();
00254 }
00255 
00256 void VSocket::setHostIPAddressAndPort(const VString& hostIPAddress, int portNumber) {
00257     mHostIPAddress = hostIPAddress;
00258     mPortNumber = portNumber;
00259     mSocketName.format("%s:%d", hostIPAddress.chars(), portNumber);
00260 }
00261 
00262 void VSocket::connectToIPAddress(const VString& ipAddress, int portNumber) {
00263     this->_connectToIPAddress(ipAddress, portNumber);
00264     this->setDefaultSockOpt();
00265 }
00266 
00267 void VSocket::connectToHostName(const VString& hostName, int portNumber) {
00268     this->connectToHostName(hostName, portNumber, VSocketConnectionStrategySingle());
00269 }
00270 
00271 void VSocket::connectToHostName(const VString& hostName, int portNumber, const VSocketConnectionStrategy& connectionStrategy) {
00272     connectionStrategy.connect(hostName, portNumber, *this);
00273 }
00274 
00275 VString VSocket::getHostIPAddress() const {
00276     return mHostIPAddress;
00277 }
00278 
00279 int VSocket::getPortNumber() const {
00280     return mPortNumber;
00281 }
00282 
00283 void VSocket::close() {
00284     if (mSocketID != kNoSocketID) {
00285         vault::closeSocket(mSocketID);
00286         mSocketID = kNoSocketID;
00287     }
00288 }
00289 
00290 void VSocket::flush() {
00291     // If subclass needs to flush, it will override this method.
00292 }
00293 
00294 void VSocket::setIntSockOpt(int level, int name, int value) {
00295     int intValue = value;
00296     this->setSockOpt(level, name, static_cast<void*>(&intValue), sizeof(intValue));
00297 }
00298 
00299 void VSocket::setLinger(int val) {
00300     struct linger lingerParam;
00301 
00302     lingerParam.l_onoff = 1;
00303 
00304 #ifdef VPLATFORM_WIN
00305     lingerParam.l_linger = static_cast<u_short>(val); // max linger time while closing
00306 #else
00307     lingerParam.l_linger = val; // max linger time while closing
00308 #endif
00309 
00310     // turn linger on
00311     this->setSockOpt(SOL_SOCKET, SO_LINGER, static_cast<void*>(&lingerParam), sizeof(lingerParam));
00312 }
00313 
00314 void VSocket::clearReadTimeOut() {
00315     mReadTimeOutActive = false;
00316 }
00317 
00318 void VSocket::setReadTimeOut(const struct timeval& timeout) {
00319     mReadTimeOutActive = true;
00320     mReadTimeOut = timeout;
00321 }
00322 
00323 void VSocket::clearWriteTimeOut() {
00324     mWriteTimeOutActive = false;
00325 }
00326 
00327 void VSocket::setWriteTimeOut(const struct timeval& timeout) {
00328     mWriteTimeOutActive = true;
00329     mWriteTimeOut = timeout;
00330 }
00331 
00332 void VSocket::setDefaultSockOpt() {
00333     // set buffer sizes
00334     this->setIntSockOpt(SOL_SOCKET, SO_RCVBUF, kDefaultBufferSize);
00335     this->setIntSockOpt(SOL_SOCKET, SO_SNDBUF, kDefaultBufferSize);
00336 
00337 #ifndef VPLATFORM_WIN
00338     // set type of service
00339     this->setIntSockOpt(IPPROTO_IP, IP_TOS, kDefaultServiceType);
00340 #endif
00341 
00342 #ifdef VPLATFORM_MAC
00343     // Normally, Unix systems will signal SIGPIPE if recv() or send() fails because the
00344     // other side has closed the socket. Not desirable; we'd rather get back an error code
00345     // like all other error types, so we can throw an exception. On Mac OS X we make this
00346     // happen by disabling SIG_PIPE here. On other Unix platforms we pass MSG_NOSIGNAL as
00347     // flags value for send() and recv() (see /_unix/vsocket.cpp).
00348     this->setIntSockOpt(SOL_SOCKET, SO_NOSIGPIPE, 1);
00349 #endif
00350 
00351     // set no delay
00352     this->setIntSockOpt(IPPROTO_TCP, TCP_NODELAY, kDefaultNoDelay);
00353 }
00354 
00355 Vs64 VSocket::numBytesRead() const {
00356     return mNumBytesRead;
00357 }
00358 
00359 Vs64 VSocket::numBytesWritten() const {
00360     return mNumBytesWritten;
00361 }
00362 
00363 VDuration VSocket::getIdleTime() const {
00364     VInstant now;
00365     return now - mLastEventTime;
00366 }
00367 
00368 int VSocket::read(Vu8* buffer, int numBytesToRead) {
00369     if (! VSocket::_platform_isSocketIDValid(mSocketID)) {
00370         throw VStackTraceException(VSTRING_FORMAT("VSocket[%s] read: Invalid socket ID %d.", mSocketName.chars(), mSocketID));
00371     }
00372 
00373     int     bytesRemainingToRead = numBytesToRead;
00374     Vu8*    nextBufferPositionPtr = buffer;
00375     fd_set  readset;
00376 
00377     while (bytesRemainingToRead > 0) {
00378 
00379         FD_ZERO(&readset);
00380         FD_SET(mSocketID, &readset);
00381         int result = ::select(SelectSockIDTypeCast (mSocketID + 1), &readset, NULL, NULL, (mReadTimeOutActive ? &mReadTimeOut : NULL));
00382 
00383         if (result < 0) {
00384             VSystemError e = VSystemError::getSocketError();
00385             if (e.isLikePosixError(EINTR)) {
00386                 // Debug message: read was interrupted but we will cycle around and try again...
00387                 continue;
00388             }
00389 
00390             if (e.isLikePosixError(EBADF)) {
00391                 throw VSocketClosedException(e, VSTRING_FORMAT("VSocket[%s] read: Socket has closed (EBADF).", mSocketName.chars()));
00392             } else {
00393                 throw VException(e, VSTRING_FORMAT("VSocket[%s] read: Select failed. Result=%d.", mSocketName.chars(), result));
00394             }
00395         } else if (result == 0) {
00396             throw VException(VSTRING_FORMAT("VSocket[%s] read: Select timed out.", mSocketName.chars()));
00397         }
00398 
00399         if (!FD_ISSET(mSocketID, &readset)) {
00400             throw VException(VSystemError::getSocketError(), VSTRING_FORMAT("VSocket[%s] read: Select got FD_ISSET false.", mSocketName.chars()));
00401         }
00402 
00403         int theNumBytesRead = SendRecvResultTypeCast ::recv(mSocketID, RecvBufferPtrTypeCast nextBufferPositionPtr, SendRecvByteCountTypeCast bytesRemainingToRead, VSOCKET_DEFAULT_RECV_FLAGS);
00404 
00405         if (theNumBytesRead < 0) {
00406             VSystemError e = VSystemError::getSocketError();
00407             if (e.isLikePosixError(EPIPE)) {
00408                 throw VSocketClosedException(e, VSTRING_FORMAT("VSocket[%s] read: Socket has closed (EPIPE).", mSocketName.chars()));
00409             } else {
00410                 throw VException(e, VSTRING_FORMAT("VSocket[%s] read: recv failed. Result=%d.", mSocketName.chars(), theNumBytesRead));
00411             }
00412         } else if (theNumBytesRead == 0) {
00413             if (mRequireReadAll) {
00414                 throw VEOFException(VSTRING_FORMAT("VSocket[%s] read: recv of %d bytes returned 0 bytes.", mSocketName.chars(), bytesRemainingToRead));
00415             } else {
00416                 break;    // got successful but partial read, caller will have to keep reading
00417             }
00418         }
00419 
00420         bytesRemainingToRead -= theNumBytesRead;
00421         nextBufferPositionPtr += theNumBytesRead;
00422 
00423         mNumBytesRead += theNumBytesRead;
00424     }
00425 
00426     mLastEventTime.setNow();
00427 
00428     return (numBytesToRead - bytesRemainingToRead);
00429 }
00430 
00431 int VSocket::write(const Vu8* buffer, int numBytesToWrite) {
00432     if (! VSocket::_platform_isSocketIDValid(mSocketID)) {
00433         throw VStackTraceException(VSTRING_FORMAT("VSocket[%s] write: Invalid socket ID %d.", mSocketName.chars(), mSocketID));
00434     }
00435 
00436     const Vu8*  nextBufferPositionPtr = buffer;
00437     int         bytesRemainingToWrite = numBytesToWrite;
00438     fd_set      writeset;
00439 
00440     while (bytesRemainingToWrite > 0) {
00441 
00442         FD_ZERO(&writeset);
00443         FD_SET(mSocketID, &writeset);
00444         int result = ::select(SelectSockIDTypeCast (mSocketID + 1), NULL, &writeset, NULL, (mWriteTimeOutActive ? &mWriteTimeOut : NULL));
00445 
00446         if (result < 0) {
00447             VSystemError e = VSystemError::getSocketError();
00448             if (e.isLikePosixError(EINTR)) {
00449                 // Debug message: write was interrupted but we will cycle around and try again...
00450                 continue;
00451             }
00452 
00453             if (e.isLikePosixError(EBADF)) {
00454                 throw VSocketClosedException(e, VSTRING_FORMAT("VSocket[%s] write: Socket has closed (EBADF).", mSocketName.chars()));
00455             } else {
00456                 throw VException(e, VSTRING_FORMAT("VSocket[%s] write: select() failed. Result=%d.", mSocketName.chars(), result));
00457             }
00458         } else if (result == 0) {
00459             throw VException(VSTRING_FORMAT("VSocket[%s] write: Select timed out.", mSocketName.chars()));
00460         }
00461 
00462         int theNumBytesWritten = SendRecvResultTypeCast ::send(mSocketID, SendBufferPtrTypeCast nextBufferPositionPtr, SendRecvByteCountTypeCast bytesRemainingToWrite, VSOCKET_DEFAULT_SEND_FLAGS);
00463 
00464         if (theNumBytesWritten <= 0) {
00465             VSystemError e = VSystemError::getSocketError();
00466             if (e.isLikePosixError(EPIPE)) {
00467                 throw VSocketClosedException(e, VSTRING_FORMAT("VSocket[%s] write: Socket has closed (EPIPE).", mSocketName.chars()));
00468             } else {
00469                 throw VException(e, VSTRING_FORMAT("VSocket[%s] write: send() failed.", mSocketName.chars()));
00470             }
00471         } else if (theNumBytesWritten != bytesRemainingToWrite) {
00472             // Debug message: write was only partially completed so we will cycle around and write the rest...
00473         } else {
00474             // This is where you could put debug/trace-mode socket write logging output....
00475         }
00476 
00477         bytesRemainingToWrite -= theNumBytesWritten;
00478         nextBufferPositionPtr += theNumBytesWritten;
00479 
00480         mNumBytesWritten += theNumBytesWritten;
00481     }
00482 
00483     return (numBytesToWrite - bytesRemainingToWrite);
00484 }
00485 
00486 void VSocket::discoverHostAndPort() {
00487     struct sockaddr_in  info;
00488     VSocklenT           infoLength = sizeof(info);
00489 
00490     int result = ::getpeername(mSocketID, (struct sockaddr*) &info, &infoLength);
00491     if (result != 0) {
00492         throw VStackTraceException(VSystemError::getSocketError(), VSTRING_FORMAT("VSocket[%s] discoverHostAndPort: getpeername() failed.", mSocketName.chars()));
00493     }
00494 
00495     int portNumber = (int) V_BYTESWAP_NTOH_S16_GET(static_cast<Vs16>(info.sin_port));
00496 
00497     const char* ipAddress = ::inet_ntoa(info.sin_addr);
00498     this->setHostIPAddressAndPort(VSTRING_COPY(ipAddress), portNumber);
00499 }
00500 
00501 void VSocket::closeRead() {
00502     int result = ::shutdown(mSocketID, SHUT_RD);
00503 
00504     if (result < 0) {
00505         throw VException(VSTRING_FORMAT("VSocket[%s] closeRead: Unable to shut down socket.", mSocketName.chars()));
00506     }
00507 }
00508 
00509 void VSocket::closeWrite() {
00510     int result = ::shutdown(mSocketID, SHUT_WR);
00511 
00512     if (result < 0) {
00513         throw VException(VSTRING_FORMAT("VSocket[%s] closeWrite: Unable to shut down socket.", mSocketName.chars()));
00514     }
00515 }
00516 
00517 void VSocket::setSockOpt(int level, int name, void* valuePtr, int valueLength) {
00518     (void) ::setsockopt(mSocketID, level, name, SetSockOptValueTypeCast valuePtr, valueLength);
00519 }
00520 
00521 void VSocket::_connectToIPAddress(const VString& ipAddress, int portNumber) {
00522     this->setHostIPAddressAndPort(ipAddress, portNumber);
00523 
00524     bool        isIPv4 = VSocket::isIPv4NumericString(ipAddress);
00525     VSocketID   socketID = ::socket((isIPv4 ? AF_INET : AF_INET6), SOCK_STREAM, 0);
00526 
00527     if (VSocket::_platform_isSocketIDValid(socketID)) {
00528 
00529         const sockaddr* infoPtr = NULL;
00530         socklen_t infoLen = 0;
00531         struct sockaddr_in infoIPv4;
00532 #ifdef VPLATFORM_WIN
00533         addrinfo addrInfo;
00534         AddrInfoLifeCycleHelper addrInfoResults;
00535 #else
00536         struct sockaddr_in6 infoIPv6;
00537 #endif
00538 
00539         if (isIPv4) {
00540             ::memset(&infoIPv4, 0, sizeof(infoIPv4));
00541             infoIPv4.sin_family = AF_INET;
00542             infoIPv4.sin_port = (in_port_t) V_BYTESWAP_HTON_S16_GET(static_cast<Vs16>(portNumber));
00543             infoIPv4.sin_addr.s_addr = ::inet_addr(ipAddress);
00544             infoPtr = (const sockaddr*) &infoIPv4;
00545             infoLen = sizeof(infoIPv4);
00546         } else {
00547 #ifdef VPLATFORM_WIN
00548 //#ifdef _WIN32_WINNT <= 0x501 // Cannot use inet_pton until Vista
00549             VString portString = VSTRING_INT(portNumber);
00550             ::memset(&addrInfo, 0, sizeof(addrInfo));
00551             addrInfo.ai_family = AF_INET6;
00552             addrInfo.ai_flags |= AI_NUMERICHOST;
00553             int getaddrinfoResult = ::getaddrinfo(ipAddress, portString, &addrInfo, &addrInfoResults.mInfo);
00554             if (getaddrinfoResult != 0) {
00555                 throw VException(VSystemError::getSocketError(), VSTRING_FORMAT("VSocket[%s] _connectToIPAddress: getaddrinfo() failed.", mSocketName.chars()));
00556             }
00557             infoPtr = (const sockaddr*) addrInfoResults.mInfo->ai_addr;
00558             infoLen = addrInfoResults.mInfo->ai_addrlen;
00559 #else
00560 
00561             ::memset(&infoIPv6, 0, sizeof(infoIPv6));
00562 //#ifndef VPLATFORM_WIN /* sin6_len is not defined in the Winsock definition! */
00563             infoIPv6.sin6_len = sizeof(infoIPv6);
00564 //#endif
00565             infoIPv6.sin6_family = AF_INET6;
00566             infoIPv6.sin6_port = (in_port_t) V_BYTESWAP_HTON_S16_GET(static_cast<Vs16>(portNumber));
00567             int ptonResult = ::inet_pton(AF_INET6, ipAddress, &infoIPv6.sin6_addr);
00568             if (ptonResult != 1) {
00569                 throw VException(VSystemError::getSocketError(), VSTRING_FORMAT("VSocket[%s] _connectToIPAddress: inet_pton() failed.", mSocketName.chars()));
00570             }
00571             infoPtr = (const sockaddr*) &infoIPv6;
00572             infoLen = sizeof(infoIPv6);
00573 #endif
00574         }
00575 
00576         int result = ::connect(socketID, infoPtr, infoLen);
00577 
00578         if (result != 0) {
00579             // Connect failed.
00580             VSystemError e = VSystemError::getSocketError(); // Call before calling vault::closeSocket(), which will succeed and clear the error code!
00581             vault::closeSocket(socketID);
00582             throw VException(e, VSTRING_FORMAT("VSocket[%s] _connect: Connect failed.", mSocketName.chars()));
00583         }
00584     }
00585 
00586     mSocketID = socketID;
00587 }
00588 
00589 void VSocket::_listen(const VString& bindAddress, int backlog) {
00590     VSocketID           listenSockID = kNoSocketID;
00591     struct sockaddr_in  info;
00592     int                 infoLength = sizeof(info);
00593     const int           on = 1;
00594 
00595     ::memset(&info, 0, sizeof(info));
00596     info.sin_family = AF_INET;
00597     info.sin_port = (in_port_t) V_BYTESWAP_HTON_S16_GET(static_cast<Vs16>(mPortNumber));
00598 
00599     if (bindAddress.isEmpty()) {
00600         info.sin_addr.s_addr = INADDR_ANY;
00601     } else {
00602         info.sin_addr.s_addr = inet_addr(bindAddress);
00603     }
00604 
00605     listenSockID = ::socket(AF_INET, SOCK_STREAM, 0);
00606     if (! VSocket::_platform_isSocketIDValid(listenSockID)) {
00607         throw VStackTraceException(VSystemError::getSocketError(), VSTRING_FORMAT("VSocket[%s] listen: socket() failed. Result=%d.", mSocketName.chars(), listenSockID));
00608     }
00609 
00610     // Once we've successfully called ::socket(), if something else fails here, we need
00611     // to close that socket. We can just throw upon any failed call, and use a try/catch
00612     // with re-throw after closure.
00613 
00614     try {
00615         int result = ::setsockopt(listenSockID, SOL_SOCKET, SO_REUSEADDR, SetSockOptValueTypeCast &on, sizeof(on));
00616         if (result != 0) {
00617             throw VStackTraceException(VSystemError::getSocketError(), VSTRING_FORMAT("VSocket[%s] listen: setsockopt() failed. Result=%d.", mSocketName.chars(), result));
00618         }
00619 
00620         result = ::bind(listenSockID, (const sockaddr*) &info, infoLength);
00621         if (result != 0) {
00622             throw VStackTraceException(VSystemError::getSocketError(), VSTRING_FORMAT("VSocket[%s] listen: bind() failed. Result=%d.", mSocketName.chars(), result));
00623         }
00624 
00625         result = ::listen(listenSockID, backlog);
00626         if (result != 0) {
00627             throw VStackTraceException(VSystemError::getSocketError(), VSTRING_FORMAT("VSocket[%s] listen: listen() failed. Result=%d.", mSocketName.chars(), result));
00628         }
00629 
00630     } catch (...) {
00631         vault::closeSocket(listenSockID);
00632         throw;
00633     }
00634 
00635     mSocketID = listenSockID;
00636 }
00637 
00638 VSocketID VSocket::getSockID() const {
00639     return mSocketID;
00640 }
00641 
00642 void VSocket::setSockID(VSocketID id) {
00643     mSocketID = id;
00644 }
00645 
00646 // VSocketInfo ----------------------------------------------------------------
00647 
00648 VSocketInfo::VSocketInfo(const VSocket& socket)
00649     : mSocketID(socket.getSockID())
00650     , mHostIPAddress(socket.getHostIPAddress())
00651     , mPortNumber(socket.getPortNumber())
00652     , mNumBytesRead(socket.numBytesRead())
00653     , mNumBytesWritten(socket.numBytesWritten())
00654     , mIdleTime(socket.getIdleTime())
00655     {
00656 }
00657 
00658 // VSocketConnectionStrategySingle --------------------------------------------
00659 
00660 void VSocketConnectionStrategySingle::connect(const VString& hostName, int portNumber, VSocket& socketToConnect) const {
00661     VStringVector ipAddresses = (mDebugIPAddresses.empty() ? VSocket::resolveHostName(hostName) : mDebugIPAddresses);
00662     socketToConnect.connectToIPAddress(ipAddresses[0], portNumber);
00663 }
00664 
00665 // VSocketConnectionStrategyLinear --------------------------------------------
00666 
00667 VSocketConnectionStrategyLinear::VSocketConnectionStrategyLinear(const VDuration& timeout)
00668     : VSocketConnectionStrategy()
00669     , mTimeout(timeout)
00670     {
00671 }
00672 
00673 void VSocketConnectionStrategyLinear::connect(const VString& hostName, int portNumber, VSocket& socketToConnect) const {
00674     // Timeout should never cause expiration before we do DNS resolution or try the first IP address.
00675     // Therefore, we calculate the expiration time, but then to DNS first, and check timeout after each failed connect.
00676     VInstant expirationTime = VInstant() + mTimeout;
00677     VStringVector ipAddresses = (mDebugIPAddresses.empty() ? VSocket::resolveHostName(hostName) : mDebugIPAddresses);
00678     for (VStringVector::const_iterator i = ipAddresses.begin(); i != ipAddresses.end(); ++i) {
00679         try {
00680             socketToConnect.connectToIPAddress(*i, portNumber);
00681             return; // As soon as we succeed, return.
00682         } catch (const VException& ex) {
00683             VLOGGER_TRACE(VSTRING_FORMAT("VSocketConnectionStrategyLinear::connect(%s): Failed to connect to '%s'. %s", hostName.chars(), (*i).chars(), ex.what()));
00684             if (VInstant(/*now*/) >= expirationTime) {
00685                 throw;
00686             }
00687         }
00688     }
00689 
00690     throw VException("VSocketConnectionStrategyLinear::connect: Failed to connect to all resolved names.");
00691 }
00692 
00693 // VSocketConnectionStrategyThreadedWorker ------------------------------------
00694 
00695 class VSocketConnectionStrategyThreadedRunner;
00696 
00697 class VSocketConnectionStrategyThreadedWorker : public VThread {
00698     public:
00699 
00700         VSocketConnectionStrategyThreadedWorker(VSocketConnectionStrategyThreadedRunner* ownerRunner, const VString& ipAddressToConnect, int portNumberToConnect);
00701         virtual ~VSocketConnectionStrategyThreadedWorker();
00702 
00703         // VThread implementation:
00704         virtual void run();
00705 
00706     private:
00707 
00708         // These contain the code to communicate safely with the owner (which is running in another thread),
00709         // to let it know (if it's still around!) that we are done in either fashion.
00710         void _handleSuccess(VSocket& openedSocket);
00711         void _handleFailure(const VException& ex);
00712 
00713         VMutex                                      mMutex;
00714         VSocketConnectionStrategyThreadedRunner*    mOwnerRunner;
00715         VString                                     mIPAddressToConnect;
00716         int                                         mPortNumberToConnect;
00717 };
00718 
00719 // VSocketConnectionStrategyThreadedRunner ------------------------------------
00720 
00730 class VSocketConnectionStrategyThreadedRunner : public VThread {
00731     public:
00732         VSocketConnectionStrategyThreadedRunner(const VDuration& timeoutInterval, int maxNumThreads, const VString& hostName, int portNumber, const VStringVector& debugIPAddresses);
00733         virtual ~VSocketConnectionStrategyThreadedRunner();
00734 
00735         // VThread implementation:
00736         virtual void run();
00737 
00738         // Caller should start() this thread, and then aggressively check for completion via hasAnswer().
00739         // Once done, call getConnectedSockID(), and if it's not kNoSockID, it's a connected sockid to take over.
00740         // Call getConnectedIPAddress() to find out where we got connected to.
00741         // Finally, call detachFromStrategy() to signal that you will no longer refer to the runner,
00742         // so that it can self-destruct.
00743         bool        hasAnswer() const;
00744         VSocketID   getConnectedSockID() const;
00745         VString     getConnectedIPAddress() const;
00746         void        detachFromStrategy();
00747 
00748     private:
00749 
00750         bool _isDone() const;
00751         bool _isDetachedFromStrategy() const;
00752         void _lockedStartWorker(const VString& ipAddressToConnect);
00753         void _lockedForgetOneWorker(VSocketConnectionStrategyThreadedWorker* worker); // forgets one worker but assumes that worker will no longer reference us
00754         void _lockedForgetAllWorkers(); // forgets all workers and tells them to stop referring to us
00755 
00756         const VInstant      mExpiry;    // Construction time plus timeout interval. After this instant, we stop creating new threads.
00757         const int           mMaxNumThreads;
00758         const VString       mHostNameToConnect;
00759         const int           mPortNumberToConnect;
00760         const VStringVector mDebugIPAddresses;
00761 
00762         bool            mDetachedFromStrategy;
00763         mutable VMutex  mMutex;
00764         VStringVector   mIPAddressesYetToTry;
00765 
00766         bool            mConnectionCompleted;
00767         bool            mAllWorkersFailed;
00768         VSocketID       mConnectedSocketID;
00769         VString         mConnectedSocketIPAddress;
00770 
00771         typedef std::deque<VSocketConnectionStrategyThreadedWorker*> WorkerList;
00772         WorkerList      mWorkers;
00773 
00774         // Private functions called only by our worker friend class.
00775         friend class VSocketConnectionStrategyThreadedWorker;
00776         void _workerSucceeded(VSocketConnectionStrategyThreadedWorker* worker, VSocket& openedSocket);
00777         void _workerFailed(VSocketConnectionStrategyThreadedWorker* worker, const VException& ex);
00778 
00779 };
00780 
00781 // VSocketConnectionStrategyThreadedWorker ------------------------------------
00782 
00783 VSocketConnectionStrategyThreadedWorker::VSocketConnectionStrategyThreadedWorker(VSocketConnectionStrategyThreadedRunner* ownerRunner, const VString& ipAddressToConnect, int portNumberToConnect)
00784     : VThread(VSTRING_FORMAT("VSocketConnectionStrategyThreadedWorker.%s:%d", ipAddressToConnect.chars(), portNumberToConnect), "vault.sockets.VSocketConnectionStrategyThreadedWorker", kDeleteSelfAtEnd, kCreateThreadDetached, NULL)
00785     , mMutex(mName)
00786     , mOwnerRunner(ownerRunner)
00787     , mIPAddressToConnect(ipAddressToConnect)
00788     , mPortNumberToConnect(portNumberToConnect)
00789     {
00790     VLOGGER_TRACE(VSTRING_FORMAT("VSocketConnectionStrategyThreadedWorker %s:%d constructor.", mIPAddressToConnect.chars(), mPortNumberToConnect));
00791 }
00792 
00793 VSocketConnectionStrategyThreadedWorker::~VSocketConnectionStrategyThreadedWorker() {
00794     VLOGGER_TRACE(VSTRING_FORMAT("VSocketConnectionStrategyThreadedWorker %s:%d destructor.", mIPAddressToConnect.chars(), mPortNumberToConnect));
00795 }
00796 
00797 void VSocketConnectionStrategyThreadedWorker::run() {
00798     VInstant connectStart;
00799     try {
00800         VSocket tempSocket;
00801         tempSocket.connectToIPAddress(mIPAddressToConnect, mPortNumberToConnect);
00802         VDuration duration(connectStart);
00803         VLOGGER_TRACE(VSTRING_FORMAT("VSocketConnectionStrategyThreadedWorker %s:%d run() succeeded with sockid %d in %s.", mIPAddressToConnect.chars(), mPortNumberToConnect, (int) tempSocket.getSockID(), duration.getDurationString().chars()));
00804         this->_handleSuccess(tempSocket);
00805     } catch (const VException& ex) {
00806         VDuration duration(connectStart);
00807         VLOGGER_TRACE(VSTRING_FORMAT("VSocketConnectionStrategyThreadedWorker %s:%d run() failed in %s.", mIPAddressToConnect.chars(), mPortNumberToConnect, duration.getDurationString().chars()));
00808         this->_handleFailure(ex);
00809     }
00810 }
00811 
00812 void VSocketConnectionStrategyThreadedWorker::_handleSuccess(VSocket& openedSocket) {
00813     VMutexLocker locker(&mMutex, "VSocketConnectionStrategyThreadedWorker::_handleSuccess");
00814     if (mOwnerRunner != NULL) {
00815         mOwnerRunner->_workerSucceeded(this, openedSocket);
00816         mOwnerRunner = NULL;
00817     }
00818 }
00819 
00820 void VSocketConnectionStrategyThreadedWorker::_handleFailure(const VException& ex) {
00821     VMutexLocker locker(&mMutex, "VSocketConnectionStrategyThreadedWorker::_handleFailure");
00822     if (mOwnerRunner != NULL) {
00823         mOwnerRunner->_workerFailed(this, ex);
00824         mOwnerRunner = NULL;
00825     }
00826 }
00827 
00828 // VSocketConnectionStrategyThreadedRunner ------------------------------------
00829 
00830 VSocketConnectionStrategyThreadedRunner::VSocketConnectionStrategyThreadedRunner(const VDuration& timeoutInterval, int maxNumThreads, const VString& hostName, int portNumber, const VStringVector& debugIPAddresses)
00831     : VThread(VSTRING_FORMAT("VSocketConnectionStrategyThreadedRunner.%s:%d", hostName.chars(), portNumber), "vault.sockets.VSocketConnectionStrategyThreadedRunner", kDeleteSelfAtEnd, kCreateThreadDetached, NULL)
00832     , mExpiry(VInstant() + timeoutInterval)
00833     , mMaxNumThreads(maxNumThreads)
00834     , mHostNameToConnect(hostName)
00835     , mPortNumberToConnect(portNumber)
00836     , mDebugIPAddresses(debugIPAddresses)
00837     , mDetachedFromStrategy(false)
00838     , mMutex(mName)
00839     , mIPAddressesYetToTry()
00840     , mConnectionCompleted(false)
00841     , mAllWorkersFailed(false)
00842     , mConnectedSocketID(VSocket::kNoSocketID)
00843     , mConnectedSocketIPAddress()
00844     , mWorkers()
00845     {
00846     VLOGGER_TRACE(VSTRING_FORMAT("VSocketConnectionStrategyThreadedRunner %s:%d constructor.", mHostNameToConnect.chars(), mPortNumberToConnect));
00847 }
00848 
00849 VSocketConnectionStrategyThreadedRunner::~VSocketConnectionStrategyThreadedRunner() {
00850     VLOGGER_TRACE(VSTRING_FORMAT("VSocketConnectionStrategyThreadedRunner %s:%d destructor.", mHostNameToConnect.chars(), mPortNumberToConnect));
00851 }
00852 
00853 void VSocketConnectionStrategyThreadedRunner::run() {
00854 
00855     /* locking scope */ {
00856         VMutexLocker locker(&mMutex, "VSocketConnectionStrategyThreadedRunner::run() starting initial workers");
00857         VStringVector ipAddresses = (mDebugIPAddresses.empty() ? VSocket::resolveHostName(mHostNameToConnect) : mDebugIPAddresses);
00858         int numWorkersRemaining = mMaxNumThreads;
00859         for (size_t i = 0; i < ipAddresses.size(); ++i) {
00860             //for (size_t i1 = ipAddresses.size(); i1 > 0; --i1) { int i = i1-1; // try backwards to get that google.com IPv6 address
00861             if (numWorkersRemaining == 0) {
00862                 mIPAddressesYetToTry.push_back(ipAddresses[i]);
00863             } else {
00864                 this->_lockedStartWorker(ipAddresses[i]);
00865                 --numWorkersRemaining;
00866             }
00867         }
00868     }
00869 
00870     // More workers will be created when and if others complete unsuccessfully.
00871 
00872     while (! this->_isDone()) {
00873         VThread::sleep(VDuration::MILLISECOND());
00874     }
00875 
00876     while (! this->_isDetachedFromStrategy()) {
00877         VThread::sleep(VDuration::MILLISECOND());
00878     }
00879 
00880 }
00881 
00882 bool VSocketConnectionStrategyThreadedRunner::hasAnswer() const {
00883     VMutexLocker locker(&mMutex, "hasAnswer");
00884     return mConnectionCompleted || mAllWorkersFailed || (VInstant() > mExpiry);
00885 }
00886 
00887 VSocketID VSocketConnectionStrategyThreadedRunner::getConnectedSockID() const {
00888     VMutexLocker locker(&mMutex, "getConnectedSockID");
00889     return mConnectedSocketID;
00890 }
00891 
00892 VString VSocketConnectionStrategyThreadedRunner::getConnectedIPAddress() const {
00893     VMutexLocker locker(&mMutex, "getConnectedIPAddress");
00894     return mConnectedSocketIPAddress;
00895 }
00896 
00897 void VSocketConnectionStrategyThreadedRunner::detachFromStrategy() {
00898     VMutexLocker locker(&mMutex, "detachFromStrategy");
00899     mDetachedFromStrategy = true;
00900 }
00901 
00902 bool VSocketConnectionStrategyThreadedRunner::_isDone() const {
00903     VMutexLocker locker(&mMutex, "_done");
00904     return mWorkers.empty();
00905 }
00906 
00907 bool VSocketConnectionStrategyThreadedRunner::_isDetachedFromStrategy() const {
00908     VMutexLocker locker(&mMutex, "_isDetachedFromStrategy");
00909     return mDetachedFromStrategy;
00910 }
00911 
00912 void VSocketConnectionStrategyThreadedRunner::_lockedStartWorker(const VString& ipAddressToConnect) {
00913     VLOGGER_TRACE(VSTRING_FORMAT("VSocketConnectionStrategyThreadedRunner starting worker %s:%d.", ipAddressToConnect.chars(), mPortNumberToConnect));
00914     VSocketConnectionStrategyThreadedWorker* worker = new VSocketConnectionStrategyThreadedWorker(this, ipAddressToConnect, mPortNumberToConnect);
00915     mWorkers.push_back(worker);
00916     worker->start();
00917 }
00918 
00919 void VSocketConnectionStrategyThreadedRunner::_workerSucceeded(VSocketConnectionStrategyThreadedWorker* worker, VSocket& openedSocket) {
00920     VMutexLocker locker(&mMutex, VSTRING_FORMAT("_workerSucceeded(%s)", worker->getName().chars()));
00921     if (mConnectionCompleted) {
00922         VLOGGER_TRACE(VSTRING_FORMAT("VSocketConnectionStrategyThreadedRunner %s:%d _workerSucceeded(sockid %d) ignored because another worker has already won.", openedSocket.getHostIPAddress().chars(), mPortNumberToConnect, (int) openedSocket.getSockID()));
00923     } else {
00924         VLOGGER_TRACE(VSTRING_FORMAT("VSocketConnectionStrategyThreadedRunner %s:%d _workerSucceeded(sockid %d) wins.", openedSocket.getHostIPAddress().chars(), mPortNumberToConnect, (int) openedSocket.getSockID()));
00925 
00926         mConnectedSocketID = openedSocket.getSockID();
00927         mConnectedSocketIPAddress = openedSocket.getHostIPAddress();
00928         openedSocket.setSockID(VSocket::kNoSocketID); // So when it destructs on return from this function, it will NOT close the adopted socket ID.
00929 
00930         mConnectionCompleted = true;
00931     }
00932 
00933     this->_lockedForgetOneWorker(worker);
00934 }
00935 
00936 void VSocketConnectionStrategyThreadedRunner::_workerFailed(VSocketConnectionStrategyThreadedWorker* worker, const VException& ex) {
00937     VMutexLocker locker(&mMutex, VSTRING_FORMAT("_workerFailed(%s)", worker->getName().chars()));
00938     this->_lockedForgetOneWorker(worker);
00939 
00940     VLOGGER_ERROR(VSTRING_FORMAT("VSocketConnectionStrategyThreadedRunner::_workerFailed: %s", ex.what()));
00941 
00942     // If we have yet to succeed, start another worker thread if we have more addresses to try.
00943     if (!mConnectionCompleted) {
00944         if (mIPAddressesYetToTry.empty()) {
00945             // Nothing left to try.
00946         } else if (VInstant() > mExpiry) {
00947             // Too much time has elapsed. Give up. Don't start a new worker. Clear the "to do" list.
00948             // Mark failure so that the caller can immediately proceed, not waiting for any other
00949             // outstanding workers to complete. The presence of an overdue expiry means we failed.
00950             mIPAddressesYetToTry.clear();
00951             mAllWorkersFailed = true;
00952         } else {
00953             // Pop the next address off and start a worker for it.
00954             VString nextIPAddressToTry = mIPAddressesYetToTry[0];
00955             mIPAddressesYetToTry.erase(mIPAddressesYetToTry.begin());
00956             this->_lockedStartWorker(nextIPAddressToTry);
00957         }
00958     }
00959 
00960     // If that failure was the last worker, then now that it's gone there are no more workers,
00961     // because we didn't just start another one in its place, then we failed.
00962     if (mWorkers.empty()) {
00963         mAllWorkersFailed = true;
00964     }
00965 }
00966 
00967 void VSocketConnectionStrategyThreadedRunner::_lockedForgetOneWorker(VSocketConnectionStrategyThreadedWorker* worker) {
00968     WorkerList::iterator position = std::find(mWorkers.begin(), mWorkers.end(), worker);
00969     if (position != mWorkers.end()) {
00970         mWorkers.erase(position);
00971     }
00972 }
00973 
00974 void VSocketConnectionStrategyThreadedRunner::_lockedForgetAllWorkers() {
00975     mWorkers.clear();
00976 }
00977 
00978 // VSocketConnectionStrategyThreaded ------------------------------------------
00979 
00980 VSocketConnectionStrategyThreaded::VSocketConnectionStrategyThreaded(const VDuration& timeoutInterval, int maxNumThreads)
00981     : VSocketConnectionStrategy()
00982     , mTimeoutInterval(timeoutInterval)
00983     , mMaxNumThreads(maxNumThreads)
00984     {
00985 }
00986 
00987 void VSocketConnectionStrategyThreaded::connect(const VString& hostName, int portNumber, VSocket& socketToConnect) const {
00988 
00989     VSocketConnectionStrategyThreadedRunner* runner = new VSocketConnectionStrategyThreadedRunner(mTimeoutInterval, mMaxNumThreads, hostName, portNumber, mDebugIPAddresses);
00990     runner->start();
00991 
00992     while (! runner->hasAnswer()) {
00993         VThread::sleep(VDuration::MILLISECOND());
00994     }
00995 
00996     VSocketID sockID = runner->getConnectedSockID();
00997     if (sockID == VSocket::kNoSocketID) {
00998         throw VException("VSocketConnectionStrategyThreaded::connect: Failed to connect to all addresses.");
00999     } else {
01000         socketToConnect.setSockID(sockID);
01001         socketToConnect.setHostIPAddressAndPort(runner->getConnectedIPAddress(), portNumber);
01002     }
01003 
01004     // Finally, let the runner know that it is safe for it to end because we are no longer referring to it.
01005     // It may still need to bookkeep worker threads that have not yet completed. It will self-delete later.
01006     runner->detachFromStrategy();
01007     runner = NULL;
01008 
01009     VLOGGER_TRACE(VSTRING_FORMAT("VSocketConnectionStrategyThreaded::connect(%s, %d) completed successfully at %s.", hostName.chars(), portNumber, socketToConnect.getHostIPAddress().chars()));
01010 }

Copyright ©1997-2014 Trygve Isaacson. All rights reserved. This documentation was generated with Doxygen.