00001 #define __NQ_INTERNAL_SW__
00002
00003 #include "PlatformSW.h"
00004 #include "NQDeclarations.h"
00005 #include "NQLogging.h"
00006 #include "NQPacketQueue.h"
00007 #include "NQMessagePool.h"
00008 #include "NQResponseWaitQueue.h"
00009 #include "NQPeerList.h"
00010 #include "NQclass.h"
00011 #include <assert.h>
00012 #include <time.h>
00013
00014 #define ERR(a) \
00015 { errcode = a; \
00016 goto err;}
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056 CNetworkQueue::CNetworkQueue()
00057 {
00058 m_socket = INVALID_SOCKET;
00059 m_iLastError = ERR_NQ_NONE;
00060 m_iLastSystemError = 0;
00061 m_bSuccessfulInitialized = false;
00062
00063 m_AckPacketWaitingQueue.setQueueMaxsize(NQ_ACKQUEUE_SIZE);
00064 m_SendPacketQueue.setQueueMaxsize(NQ_SENDQUEUE_SIZE);
00065 m_ReceivePacketQueue.setQueueMaxsize(NQ_RECVQUEUE_SIZE);
00066 }
00067
00068
00069
00070
00071
00072
00073 CNetworkQueue::~CNetworkQueue()
00074 {
00075 if (m_bSuccessfulInitialized == true)
00076 {
00077
00078 close();
00079 }
00080 }
00081
00082
00083
00084
00085
00086
00087
00088
00094 CNQMessage *CNetworkQueue::createMsg()
00095 {
00096 return obtainReference();
00097 }
00098
00099 bool CNetworkQueue::freeMsg( CNQMessage *ref)
00100 {
00101 return putbackReference( ref );
00102 }
00103
00104
00105
00106 CNQMessage *CNetworkQueue::obtainReference()
00107 {
00108 assert( m_bSuccessfulInitialized == true );
00109
00110 return( bufs.acquire_reference() );
00111 }
00112
00113 bool CNetworkQueue::incrementReferenceCount( CNQMessage *ref )
00114 {
00115 assert( m_bSuccessfulInitialized == true );
00116
00117 return( bufs.increment_referencecount(ref) );
00118 }
00119
00120
00121 bool CNetworkQueue::putbackReference( CNQMessage *ref)
00122 {
00123 assert( m_bSuccessfulInitialized == true );
00124
00125 return( bufs.putback_reference(ref) );
00126 }
00127
00128
00129
00130
00144 bool CNetworkQueue::addPeer(UINT32 ip_hostorder, UINT32 port_hostorder)
00145 {
00146 assert( m_bSuccessfulInitialized == true );
00147
00148
00149 return m_PeerList.add(ip_hostorder, port_hostorder);
00150 }
00151
00152
00153
00154
00169 bool CNetworkQueue::removePeer(UINT32 ip_hostorder, UINT32 port_hostorder)
00170 {
00171 assert( m_bSuccessfulInitialized == true );
00172
00173 return m_PeerList.remove(ip_hostorder, port_hostorder);
00174 }
00175
00176
00177
00178
00213 bool CNetworkQueue::create( const ONECHAR *ip_remote_peer, UINT16 portnumber_remote, UINT16 portnumber_local )
00214 {
00215 if (m_bSuccessfulInitialized == true)
00216 {
00217
00218 m_iLastError = ERR_NQ_ALREADY;
00219 return false;
00220 }
00221
00222 INT32 errcode;
00223 INT32 ret;
00224 struct sockaddr_in LOC;
00225
00226 m_sendSemaphore = NULL;
00227 m_receiveSemaphore = NULL;
00228 m_hSendThread = NULL;
00229 m_hReceiveThread = NULL;
00230
00231 if (bufs.Initialize(NQ_NUM_CNQMESSAGEBUFFERS) == false) ERR(ERR_NQ_BADPOOL);
00232
00233 m_PeerList.removeAll();
00234
00235 if ((ip_remote_peer != NULL))
00236 {
00237 if (strlen(ip_remote_peer)!=0)
00238 {
00239 UINT32 ip_net = platformIPAddrConvert( ip_remote_peer );
00240
00241 if (ip_net == INADDR_NONE) ERR(ERR_NQ_BADIP);
00242 if (m_PeerList.add(ntohl(ip_net), portnumber_remote) == false) ERR(ERR_NQ_ADDPEER);
00243 }
00244 }
00245
00246 m_iNextRequestId = 1;
00247
00248
00249
00250 LOC.sin_family = AF_INET;
00251 LOC.sin_port = htons( portnumber_local );
00252 LOC.sin_addr.s_addr = INADDR_ANY;
00253
00254 m_socket = platformSocketCreateInet(SOCK_DGRAM);
00255 if (m_socket == SOCKET_ERROR) ERR(ERR_NQ_SOCREATE);
00256
00257 ret = platformSocketBind(m_socket, (const struct sockaddr *) &LOC, sizeof(LOC));
00258 if (ret == SOCKET_ERROR) ERR(ERR_NQ_SOBIND);
00259
00260
00261 ret = platformSocketSetNonblocking( m_socket );
00262 if (ret == SOCKET_ERROR) ERR(ERR_NQ_SONONBLOCK);
00263
00264
00265 memset( (void *) &m_stats, 0, sizeof(m_stats) );
00266
00267 m_stats.StartTime.tv_sec = time(NULL);
00268 m_stats.StartTime.tv_usec = 0;
00269
00270
00271
00272 m_sendSemaphore = platformCreateSemaphore(NQ_SENDQUEUE_SIZE);
00273 if (m_sendSemaphore == NULL) ERR(ERR_NQ_SEMCREATESEND);
00274
00275 m_receiveSemaphore = platformCreateSemaphore(NQ_RECVQUEUE_SIZE);
00276 if (m_receiveSemaphore == NULL) ERR(ERR_NQ_SEMCREATERECV);
00277
00278
00279 m_bShutdownSendThread = false;
00280 m_bSendThreadRunning = false;
00281
00282 m_hSendThread = platformCreateThread(true, this, NQ_SEND_THREAD_PRIORITY);
00283 if (m_hSendThread == NULL) ERR(ERR_NQ_THRDCREATESEND);
00284
00285 m_bShutdownReceiveThread = false;
00286 m_bReceiveThreadRunning = false;
00287
00288 m_hReceiveThread = platformCreateThread(false, this, NQ_RECV_THREAD_PRIORITY);
00289 if (m_hReceiveThread == NULL) ERR(ERR_NQ_THRDCREATERECV);
00290
00291 m_bSuccessfulInitialized = true;
00292
00293 return true;
00294
00295 err:
00296 NQ_DBG(("NQ::create: in error stream: errcode=%d\n",errcode));
00297 m_iLastError = (unsigned int) errcode;
00298
00299 if ((m_iLastError == ERR_NQ_SOCREATE) ||
00300 (m_iLastError == ERR_NQ_SOBIND) ||
00301 (m_iLastError == ERR_NQ_SONONBLOCK)) m_iLastSystemError = platformSocketGetLastError();
00302
00303 if (m_iLastError > ERR_NQ_SONONBLOCK) m_iLastSystemError = platformGetLastError();
00304
00305
00306 if (m_iLastError > ERR_NQ_THRDCREATESEND)
00307 {
00308
00309 m_bShutdownSendThread = true;
00310 while( m_bSendThreadRunning == true)
00311 {
00312 platformSleepMilliseconds(100);
00313 }
00314
00315 platformDeleteThread( m_hSendThread );
00316 }
00317
00318 if (m_iLastError > ERR_NQ_SEMCREATERECV)
00319 if (m_receiveSemaphore != NULL) platformDeleteSemaphore( m_receiveSemaphore );
00320
00321 if (m_iLastError > ERR_NQ_SEMCREATESEND)
00322 if (m_sendSemaphore != NULL) platformDeleteSemaphore( m_sendSemaphore);
00323
00324 if (m_iLastError > ERR_NQ_SOCREATE)
00325 {
00326 ret = platformSocketClose(m_socket);
00327 m_socket = INVALID_SOCKET;
00328 }
00329
00330 if (m_iLastError > ERR_NQ_BADPOOL)
00331 {
00332 bufs.Deinitialize();
00333 }
00334
00335
00336 return false;
00337 }
00338
00339
00340
00341
00342
00371 bool CNetworkQueue::close()
00372 {
00373 INT32 ret;
00374 INT32 errcode;
00375
00376 if (m_bSuccessfulInitialized == true)
00377 {
00378 m_bShutdownSendThread = true;
00379 m_bShutdownReceiveThread = true;
00380
00381 NQ_DBG(("Waiting for send and receive threads to shut down...\n"));
00382
00383 while( m_bSendThreadRunning == true)
00384 {
00385 platformSleepMilliseconds(100);
00386 }
00387
00388 while( m_bReceiveThreadRunning == true)
00389 {
00390 platformSleepMilliseconds(100);
00391 }
00392 NQ_DBG(("Success. Send and receive threads are shut down.\n"));
00393
00394 if (platformDeleteThread( m_hSendThread ) == false) ERR(ERR_NQ_THRDCLEANUP);
00395 if (platformDeleteThread( m_hReceiveThread ) == false) ERR(ERR_NQ_THRDCLEANUP);
00396
00397 ret = platformSocketClose(m_socket);
00398 if (ret == SOCKET_ERROR) ERR(ERR_NQ_SOCLOSE);
00399 m_socket = INVALID_SOCKET;
00400
00401
00402
00403 int cnt = m_SendPacketQueue.size();
00404 int i;
00405
00406 for (i=0;i<cnt;i++)
00407 {
00408 CNQMessage *tmp;
00409
00410 tmp = NULL;
00411 if (m_SendPacketQueue.pop_front(&tmp) == true)
00412 {
00413 putbackReference(tmp);
00414 }
00415 }
00416
00417 cnt = m_ReceivePacketQueue.size();
00418 for (i=0;i<cnt;i++)
00419 {
00420 CNQMessage *tmp;
00421
00422 tmp = NULL;
00423 if (m_ReceivePacketQueue.pop_front(&tmp) == true)
00424 {
00425 putbackReference(tmp);
00426 }
00427 }
00428
00429 cnt = m_ReceiveResponseSemaphorePacketQueue.size();
00430 for (i=0;i<cnt;i++)
00431 {
00432 CNQMessage *tmp;
00433
00434 tmp = NULL;
00435 if (m_ReceiveResponseSemaphorePacketQueue.pop_front(&tmp) == true)
00436 {
00437 putbackReference(tmp);
00438 }
00439 }
00440
00441
00442 if (m_sendSemaphore != NULL)
00443 if (platformDeleteSemaphore( m_sendSemaphore ) == false) ERR(ERR_NQ_SEMCLEANUP);
00444 if (m_receiveSemaphore != NULL)
00445 if (platformDeleteSemaphore( m_receiveSemaphore ) == false) ERR(ERR_NQ_SEMCLEANUP);
00446
00447 bufs.Deinitialize();
00448
00449 m_bSuccessfulInitialized = false;
00450
00451 }
00452 return true;
00453
00454 err:
00455 m_iLastError = (unsigned int) errcode;
00456 if (m_iLastError == ERR_NQ_SOCLOSE)
00457 m_iLastSystemError = platformSocketGetLastError();
00458 else
00459 m_iLastSystemError = platformGetLastError();
00460
00461 return false;
00462 }
00463
00464
00465
00466
00467
00468
00469
00470
00471
00472
00473
00474
00475
00476
00477
00478
00479
00480
00481
00482 bool CNetworkQueue::sendMsgTo( CNQMessage *msg, UINT32 ip_hostorder, UINT16 port_hostorder)
00483 {
00484
00485 msg->internalPacketSpecialBits &= ~NQ_PACKET_BIT_WANT_ACK;
00486
00487 return sendMsgToInternal( ip_hostorder, port_hostorder, msg, NQ_NO_SEMAPHORE);
00488 }
00489
00490
00491
00492
00531 bool CNetworkQueue::sendMsgTo( CNQMessage *msg, UINT32 ip_hostorder, UINT16 port_hostorder, const NQ_TIMEVAL &timeout)
00532 {
00533 INT32 errcode;
00534 UINT32 msecs = timeout.tv_sec * 1000 + (timeout.tv_usec / 1000);
00535 bool retval;
00536 HANDLE ack_sem=NULL;
00537 bool bAckQueueRemovedTimedoutMessage;
00538
00539 assert( m_bSuccessfulInitialized == true );
00540 assert( msg != NULL );
00541
00542
00543 msg->internalPacketSpecialBits |= NQ_PACKET_BIT_WANT_ACK;
00544
00545 if (msg->check_int() != 0) ERR(ERR_NQ_MSGINVALID);
00546
00547 if (m_PeerList.isAllowed(ip_hostorder, port_hostorder) == false) ERR(ERR_NQ_INVALIDPEER);
00548
00549 if (msg->isRequest() == true)
00550 {
00551 msg->internalRequestID = m_iNextRequestId++;
00552 }
00553
00554 msg->message_sendto_ip_host = ip_hostorder;
00555 msg->message_sendto_port_host = port_hostorder;
00556
00557 ack_sem = platformCreateSemaphore(1);
00558 if (ack_sem == NULL) ERR(ERR_NQ_SEMCREATE);
00559 msg->notifySemaphoreOnReceiveAck = ack_sem;
00560
00561 incrementReferenceCount(msg);
00562 if (m_AckPacketWaitingQueue.push_back(msg) == false)
00563 {
00564 putbackReference(msg);
00565 ERR(ERR_NQ_ACKQUEUE);
00566 }
00567
00568
00569 incrementReferenceCount(msg);
00570 UINT32 error;
00571 if (m_SendPacketQueue.push_back( msg, error) == false)
00572 {
00573 putbackReference(msg);
00574 if (error == ERR_PQ_FULL)
00575 {
00576 m_stats.SntQueueFull++;
00577 ERR(ERR_NQ_SENDQUEUEFULL);
00578 }
00579 else
00580 {
00581 NQ_DBG(("SendMessageToBlocking: Sendqueue error: %d\n",error));
00582 m_stats.SntQueueErrors++;
00583 ERR(ERR_NQ_SENDQUEUE);
00584 }
00585 }
00586
00587 if (platformSignalSemaphore( m_sendSemaphore ) == false) ERR(ERR_NQ_SEMNOTIFY);
00588
00589 if (platformWaitForSemaphore(ack_sem, msecs) == true) retval = true; else retval = false;
00590
00591
00592
00593 bAckQueueRemovedTimedoutMessage = m_AckPacketWaitingQueue.find_and_remove_passed_elem( msg );
00594 assert(bAckQueueRemovedTimedoutMessage == true);
00595 putbackReference( msg );
00596
00597 if (retval == false)
00598 {
00599 m_stats.AckTimeout++;
00600 ERR(ERR_NQ_TIMEOUT);
00601 }
00602
00603 if (platformDeleteSemaphore( ack_sem ) == false) ERR(ERR_NQ_SEMDELETE);
00604
00605 return retval;
00606
00607 err:
00608 if ((errcode == ERR_NQ_SENDQUEUE) || (errcode == ERR_NQ_SENDQUEUEFULL))
00609 {
00610 if (m_AckPacketWaitingQueue.find_and_remove_passed_elem( msg ) == true)
00611 {
00612 putbackReference(msg);
00613 }
00614 }
00615
00616 if ((errcode == ERR_NQ_SENDQUEUEFULL) || (errcode == ERR_NQ_SENDQUEUE) || (errcode == ERR_NQ_ACKQUEUE) || (errcode == ERR_NQ_TIMEOUT) )
00617 {
00618 if (ack_sem != NULL) platformDeleteSemaphore( ack_sem );
00619 }
00620
00621 if (errcode != ERR_NQ_NONE) m_iLastError = (unsigned int) errcode;
00622
00623 return false;
00624 }
00625
00626
00627
00628
00629
00659 bool CNetworkQueue::sendMsgToInternal( UINT32 ip_hostorder, UINT16 port_hostorder, CNQMessage *msg, HANDLE recv_notify_sem)
00660 {
00661 INT32 errcode;
00662
00663 assert( m_bSuccessfulInitialized == true );
00664 assert( msg != NULL );
00665
00666 if (msg->check_int() != 0) ERR(ERR_NQ_MSGINVALID);
00667 if (m_PeerList.isAllowed(ip_hostorder, port_hostorder) == false) ERR(ERR_NQ_INVALIDPEER);
00668
00669
00670 msg->message_sendto_ip_host = ip_hostorder;
00671 msg->message_sendto_port_host = port_hostorder;
00672
00673 incrementReferenceCount(msg);
00674 UINT32 error;
00675 if (m_SendPacketQueue.push_back( msg, error) == false)
00676 {
00677 putbackReference(msg);
00678 if (error == ERR_PQ_FULL)
00679 {
00680 m_stats.SntQueueFull++;
00681 ERR(ERR_NQ_SENDQUEUEFULL);
00682 }
00683 else
00684 {
00685 NQ_DBG(("SendMessageInternal(): Sendqueue error: %d\n",error));
00686 m_stats.SntQueueErrors++;
00687 ERR(ERR_NQ_SENDQUEUE);
00688 }
00689 }
00690
00691 if (platformSignalSemaphore( m_sendSemaphore ) == false) ERR(ERR_NQ_SEMNOTIFY);
00692
00693 return true;
00694 err:
00695 m_iLastError = (unsigned int) errcode;
00696
00697 return false;
00698 }
00699
00700
00701
00702
00735 bool CNetworkQueue::recvMsg( CNQMessage **newmsg )
00736 {
00737 bool ret;
00738 INT32 errcode=0;
00739 UINT32 lasterr;
00740
00741 assert( m_bSuccessfulInitialized == true );
00742 assert( newmsg != NULL );
00743
00744 if (platformIsSemaphoreSignalled( m_receiveSemaphore ) == true)
00745 {
00746 ret = m_ReceivePacketQueue.pop_front( newmsg, lasterr );
00747 if (ret == false)
00748 {
00749 if (lasterr == ERR_PQ_NONE)
00750 {
00751 assert(26==34);
00752 ERR(ERR_NQ_EMPTY);
00753 }
00754 ERR(ERR_NQ_RECVQUEUE);
00755 }
00756 if ((*newmsg)->wantAckPacket() == true)
00757 {
00758 CNQMessage *ack = obtainReference();
00759 if (ack != NULL)
00760 {
00761 UINT32 ip_hostorder = (*newmsg)->message_came_from_ip;
00762 UINT16 port_hostorder = (*newmsg)->message_came_from_port;
00763
00764 ack->constructAckPacket( **newmsg );
00765
00766 if (sendMsgToInternal( ip_hostorder, port_hostorder, ack, NQ_NO_SEMAPHORE) == false)
00767 {
00768 NQ_DBG(("SendMessageinternal (for ack packet) failed (1).\n"));
00769 }
00770
00771 putbackReference(ack);
00772 }
00773 else assert( ack != NULL );
00774 }
00775
00776 return true;
00777 }
00778 ERR(ERR_NQ_EMPTY);
00779
00780 err:
00781 m_iLastError = (unsigned int) errcode;
00782
00783 return false;
00784 }
00785
00786
00787
00788
00827 bool CNetworkQueue::recvMsg( CNQMessage **newmsg, const NQ_TIMEVAL &timeout )
00828 {
00829 UINT32 timeout_msec;
00830 INT32 errcode=0;
00831
00832 assert( m_bSuccessfulInitialized == true );
00833 assert( newmsg != NULL );
00834
00835 timeout_msec = timeout.tv_sec * 1000 + (timeout.tv_usec / 1000);
00836
00837 if (platformWaitForSemaphore( m_receiveSemaphore, timeout_msec ) == true)
00838 {
00839 UINT32 lasterr;
00840 bool ret = m_ReceivePacketQueue.pop_front( newmsg, lasterr );
00841 if (ret == false)
00842 {
00843 if (lasterr == 0)
00844 {
00845 assert(26==34);
00846 ERR(ERR_NQ_EMPTY);
00847 }
00848 ERR(ERR_NQ_RECVQUEUE);
00849 }
00850
00851 if ((*newmsg)->wantAckPacket() == true)
00852 {
00853 CNQMessage *ack = obtainReference();
00854 if (ack != NULL)
00855 {
00856 UINT32 ip_hostorder = (*newmsg)->message_came_from_ip;
00857 UINT16 port_hostorder = (*newmsg)->message_came_from_port;
00858
00859 ack->constructAckPacket( **newmsg );
00860
00861 if (sendMsgToInternal(ip_hostorder, port_hostorder, ack, NQ_NO_SEMAPHORE) == false)
00862 {
00863 NQ_DBG(("SendMessageinternal (for ack packet) failed (2).\n"));
00864 }
00865
00866 putbackReference(ack);
00867 }
00868 else assert( ack != NULL );
00869 }
00870
00871 return true;
00872 }
00873 else
00874 m_iLastError = ERR_NQ_TIMEOUT;
00875
00876 return false;
00877
00878 err:
00879 m_iLastError = (unsigned int) errcode;
00880
00881 return false;
00882 }
00883
00884
00885
00886
00932 bool CNetworkQueue::sendRecvMsg(CNQMessage **response, CNQMessage *request, UINT32 ip_hostorder, UINT16 port_hostorder, const NQ_TIMEVAL &timeout)
00933 {
00934 bool retval = false;
00935
00936 INT32 errcode;
00937 UINT32 msecs = timeout.tv_sec * 1000 + (timeout.tv_usec / 1000);
00938 CNQMessage *l_response;
00939 bool pushToResponseWaitQueue;
00940
00941 assert( m_bSuccessfulInitialized == true );
00942 assert( request != NULL );
00943 assert( response != NULL );
00944
00945 if (request->isRequest() == false) return false;
00946
00947 request->internalRequestID = m_iNextRequestId++;
00948
00949 HANDLE recv_sem = platformCreateSemaphore(1);
00950 if (recv_sem == NULL) ERR(ERR_NQ_SEMCREATE);
00951
00952
00953 NQ_WAITFORRESPONSE resp;
00954 resp.semaphore_to_signal = recv_sem;
00955 resp.requestid = request->internalRequestID;
00956
00957 pushToResponseWaitQueue = m_ResponseWaitQueue.push_back( resp );
00958 if ( pushToResponseWaitQueue == false) ERR(ERR_NQ_RESPWAITQUEUE);
00959
00960
00961 if (sendMsgToInternal(ip_hostorder, port_hostorder, request, recv_sem) == false) ERR(0);
00962
00963 m_stats.BlockingCallRequestSent++;
00964
00965 if (platformWaitForSemaphore(recv_sem, msecs) == true)
00966 {
00967
00968 bool success = m_ReceiveResponseSemaphorePacketQueue.find_and_remove_elem(request->internalRequestID, response);
00969 if (success == true)
00970 {
00971 m_stats.BlockingCallResponseReceivedOnTime++;
00972 retval = true;
00973 }
00974 else ERR(ERR_NQ_RESPTROUBLE);
00975 }
00976 else
00977 {
00978
00979 bool killed = m_ResponseWaitQueue.remove_elem( request->internalRequestID );
00980 if (killed == false)
00981 {
00982
00983
00984
00985 bool success = m_ReceiveResponseSemaphorePacketQueue.find_and_remove_elem(request->internalRequestID, &l_response);
00986
00987 {
00988 if (success == true) NQ_DBG(("Response came just a bit too late!\n"));
00989 if (success == false) NQ_DBG(("Could not remove.\n"));
00990 }
00991
00992
00993
00994
00995 success=true;
00996
00997 }
00998 m_stats.BlockingCallResponseTimeout++;
00999 ERR(ERR_NQ_TIMEOUT);
01000
01001 }
01002
01003 if (platformDeleteSemaphore( recv_sem ) == false) ERR(ERR_NQ_SEMDELETE);
01004
01005 return retval;
01006
01007 err:
01008 if (errcode == ERR_NQ_NONE)
01009 {
01010 m_ResponseWaitQueue.remove_elem( request->internalRequestID );
01011 }
01012
01013 if ((errcode == ERR_NQ_TIMEOUT) || (errcode == ERR_NQ_RESPTROUBLE) || (errcode == ERR_NQ_RESPWAITQUEUE) || (errcode == ERR_NQ_NONE) )
01014 {
01015 if (recv_sem != NULL) platformDeleteSemaphore( recv_sem );
01016 }
01017
01018 if (errcode != ERR_NQ_NONE) m_iLastError = (unsigned int) errcode;
01019
01020 return false;
01021 }
01022
01023
01024
01039 bool CNetworkQueue::getStatistics( NQ_STATISTICS &stats )
01040 {
01041 stats = m_stats;
01042 return true;
01043 }
01044
01045
01046
01051
01052
01053 UINT32 CNetworkQueue::getLastError()
01054 {
01055 return m_iLastError;
01056 }
01057
01075 bool CNetworkQueue::getLastErrorString( ONECHAR *buffer, UINT32 buflength)
01076 {
01077
01078 ONECHAR os_additional_error[300];
01079 static ONECHAR last_error_buffer[200];
01080 static const ONECHAR classname[]="CNetworkQueue";
01081
01082 static const ONECHAR clserrors[ERR_NQ_NUMERRORS][72]=
01083 {
01084 "No error.",
01085 "The class was already initialized before (at InitializeQueue).",
01086 "Error creating CNQMessage pool (CNQMessageBuffers) at InitializeQueue..",
01087 "The dotted IP address is invalid at InitializeQueue.",
01088 "Could not create udp socket at InitializeQueue().",
01089 "Could not bind udp socket at InitializeQueue().",
01090 "Could not add peer to list (on InitializeQueue).",
01091 "Error putting socket into nonblocking mode at InitializeQueue().",
01092 "Could not create Send semaphore at InitializeQueue().",
01093 "Could not create Receive semaphore at InitializeQueue().",
01094 "Could not start send thread/task at InitializeQueue().",
01095 "Could not start receive thread/task at InitializeQueue().",
01096 "Could not cleanup tasks/threads at DeinitializeQueue().",
01097 "Could not close socket at DeinitializeQueue().",
01098 "Could not cleanup semaphores at DeinitializeQueue().",
01099 "The packet to be sent is not valid (SendMessage()).",
01100 "The sent queue is filled up at the moment (SendMessage()).",
01101 "Pushing the packet to send queue failed (SendMessage()).",
01102 "Semaphore could not be signalled (SendMessage()).",
01103 "Queue was empty (isNewMessageAvailable()).",
01104 "Error getting message out of nonempty queue (isNewMessageAvailable()).",
01105 "Timeout happened (isNewMessageAvailable or SendRequestAndWaitForResp.)",
01106 "Error creating temp. semaphore (Request/Response blocking).",
01107 "Through semaphore announced response was not found in dedicated queue!",
01108 "Temporary semaphore could not be deleted (Request/Response blocking).",
01109 "Invalid remote peer: (ip, port) is not contained in peer list.",
01110 "Internal error in 'ack' queue.",
01111 "Internal error in 'response wait' queue."
01112
01113 };
01114
01115 last_error_buffer[0] = 0;
01116 os_additional_error[0] = 0;
01117
01118 assert( buffer != NULL );
01119 assert( m_iLastError < ERR_NQ_NUMERRORS);
01120
01121 if ((m_iLastError == ERR_NQ_SOCREATE) ||
01122 (m_iLastError == ERR_NQ_SOBIND) ||
01123 (m_iLastError == ERR_NQ_SONONBLOCK) ||
01124 (m_iLastError == ERR_NQ_SOCLOSE))
01125 {
01126 sprintf((char *) last_error_buffer, "%s Error: %s\n(sys error was: %d -> \"%s\")",
01127 classname, clserrors[m_iLastError-ERR_NQ_BASE], m_iLastSystemError,
01128 platformGetOSErrorString(m_iLastSystemError, os_additional_error,
01129 sizeof(os_additional_error)) );
01130 }
01131 else
01132 {
01133 if (m_iLastError >= ERR_NQ_BASE && m_iLastError < ERR_NQ_NUMERRORS)
01134 {
01135
01136 sprintf((char *) last_error_buffer, "%s Error: %s", classname, clserrors[m_iLastError-ERR_NQ_BASE] );
01137 }
01138 else
01139 {
01140
01141 sprintf((char *) last_error_buffer, "%s Error: %s", classname, "Unknown internal error." );
01142 }
01143 }
01144
01145 UINT32 total_length = strlen((char *)last_error_buffer)+1;
01146 if (total_length > buflength) return false;
01147 memcpy(buffer, last_error_buffer, total_length);
01148
01149 return true;
01150 }
01151
01152
01153
01154
01155
01156
01157
01158
01159
01160 void CNetworkQueue::SendProcessTask()
01161 {
01162 UINT8 initial_buffer[NQ_MAX_PACKET_SIZE];
01163 CNQMessage *m_temp;
01164
01165 NQ_DBG(("SendTask: Start!\n"));
01166
01167 m_bSendThreadRunning = true;
01168 while( 1 == 1)
01169 {
01170
01171
01172
01173 bool ret = platformWaitForSemaphore(m_sendSemaphore, NQ_SEND_SEMAPHORE_TIMEOUT_MS);
01174
01175 if (ret == true)
01176 {
01177
01178
01179 bool ret_get = m_SendPacketQueue.pop_front(&m_temp);
01180 if (ret_get == false)
01181 {
01182 m_stats.SntQueueErrors++;
01183 continue;
01184 }
01185
01186 INT32 message_size = 0;
01187
01188
01189 bool success = m_PeerList.getSendPacketIndex(m_temp->message_sendto_ip_host,m_temp->message_sendto_port_host, m_temp->internalPacketIndex);
01190 assert( success == true);
01191 success = true;
01192
01193
01194 message_size = NQ_MAX_PACKET_SIZE;
01195 bool ret_enc = m_temp->ExportMessageToPacketBuffer(initial_buffer, message_size);
01196
01197
01198 if (ret_enc == false)
01199 {
01200 NQ_DBG(("SendTask: Encoding failed.\n"));
01201 m_stats.SntEncodingErrors++;
01202 putbackReference(m_temp);
01203 continue;
01204 }
01205
01206 INT32 still_to_write = message_size;
01207 UINT8 *bufptr = initial_buffer;
01208 bool abort=false;
01209
01210
01211
01212
01213
01214 INT32 accumsockettimeouts = 0;
01215
01216 while (abort == false)
01217 {
01218 struct timeval tmout;
01219 fd_set fdwset;
01220
01221 FD_ZERO( &fdwset );
01222 FD_SET( m_socket, &fdwset);
01223
01224 tmout.tv_sec = NQ_SEND_SELECT_TIMEOUT_MS / 1000;
01225 tmout.tv_usec = (NQ_SEND_SELECT_TIMEOUT_MS % 1000)*1000;
01226
01227 INT32 ret_select = platformSocketSelect(platformMaxFdSets, NULL, &fdwset, NULL, &tmout);
01228 if (ret_select != 1)
01229 {
01230 accumsockettimeouts++;
01231 if (accumsockettimeouts > 10)
01232 {
01233 abort = true;
01234 m_stats.SntSocketErrors++;
01235 }
01236 }
01237
01238
01239 INT32 ret_send = platformSocketSendToFunction(m_socket, bufptr, still_to_write,m_temp->message_sendto_ip_host,m_temp->message_sendto_port_host);
01240
01241 if (ret_send >= 0)
01242 {
01243 still_to_write -= ret_send;
01244 bufptr += ret_send;
01245
01246 if (still_to_write <= 0) abort = true;
01247 }
01248 else
01249 {
01250 if (ret_send == SOCKET_ERROR)
01251 {
01252 if (platformSocketGetLastError() != EAGAIN)
01253 {
01254 abort = true;
01255 m_stats.SntSocketErrors++;
01256 }
01257 }
01258 }
01259 }
01260
01261 if (still_to_write != 0)
01262 {
01263
01264
01265
01266
01267
01268
01269
01270
01271
01272
01273
01274
01275
01276 }
01277 else
01278 {
01279 m_PeerList.incSendPacketIndex(m_temp->message_sendto_ip_host,m_temp->message_sendto_port_host);
01280
01281 if (m_temp->isAckPacket() == true) m_stats.SntAckPackets++;
01282
01283 else if (m_temp->isTelegram() == true) m_stats.SntTelegrams++;
01284 else if (m_temp->isRequest() == true) m_stats.SntRequests++;
01285 else m_stats.SntResponses++;
01286
01287 }
01288 putbackReference(m_temp);
01289 }
01290
01291 if (m_bShutdownSendThread == true) break;
01292
01293 }
01294 m_bSendThreadRunning = false;
01295 NQ_DBG(("SendTask: At the very end of task.\n"));
01296 }
01297
01298
01299
01300
01301
01302
01303
01304
01305
01306
01307
01308 void CNetworkQueue::ReceiveProcessTask()
01309 {
01310 UINT8 initial_buffer[NQ_MAX_PACKET_SIZE];
01311 CNQMessage *m_temp;
01312 UINT32 error;
01313 UINT32 ipfrom;
01314 UINT16 portfrom;
01315
01316 NQ_DBG(("ReceiveTask: Start!\n"));
01317
01318 m_bReceiveThreadRunning = true;
01319 while( 1 == 1)
01320 {
01321 struct timeval tmout;
01322 fd_set fdrset;
01323
01324 FD_ZERO( &fdrset );
01325 FD_SET( m_socket, &fdrset);
01326
01327 tmout.tv_sec = NQ_RECV_SELECT_TIMEOUT_MS / 1000;
01328 tmout.tv_usec = (NQ_RECV_SELECT_TIMEOUT_MS % 1000)*1000;
01329
01330 INT32 ret_select = platformSocketSelect(platformMaxFdSets, &fdrset, NULL, NULL, &tmout);
01331 if (m_bShutdownReceiveThread == true) break;
01332
01333 if (ret_select == 1)
01334 {
01335 INT32 ret_recv = platformSocketReceiveFromFunction(m_socket, initial_buffer, NQ_MAX_PACKET_SIZE, &ipfrom, &portfrom);
01336 if (ret_recv != SOCKET_ERROR)
01337 {
01338 if (ret_recv == 0)
01339 {
01340
01341 m_stats.RcvNoDataReceived++;
01342 continue;
01343 }
01344
01345 if (m_PeerList.isAllowed(ipfrom, portfrom) == false)
01346 {
01347 m_stats.RcvInvalidPeerAddress++;
01348 continue;
01349 }
01350
01351 m_temp = obtainReference();
01352 if (m_temp == NULL)
01353 {
01354 m_stats.RcvNoBuffersLeft++;
01355 continue;
01356 }
01357
01358 bool valid = m_temp->ImportMessageFromPacketBuffer(initial_buffer, ret_recv, ipfrom, portfrom );
01359 if (valid == false)
01360 {
01361 m_stats.RcvDroppedPacketsBadChecksum++;
01362 putbackReference(m_temp);
01363 continue;
01364 }
01365
01366
01367
01368
01369 {
01370 UINT16 last_packet_index;
01371 static UINT16 ret_recv_lpi;
01372
01373 if (m_PeerList.getReceivePacketIndex( ipfrom, portfrom, last_packet_index) == false)
01374 {
01375 bool success = m_PeerList.setReceivePacketIndex( ipfrom, portfrom, m_temp->internalPacketIndex);
01376 assert( success == true );
01377 success=true;
01378 ret_recv_lpi = ret_recv;
01379
01380 }
01381 else
01382 {
01383 INT32 lpi = (INT32) last_packet_index;
01384 INT32 cpi = (INT32) m_temp->internalPacketIndex;
01385 INT32 tmp;
01386
01387 tmp = ((cpi-lpi)&0xffff)-1;
01388 assert( tmp >= 0 );
01389
01390 if (tmp != 0)
01391 {
01392 m_stats.RcvPacketsMissedByIndex += tmp;
01393 NQ_DBG(("ReceiveTask: Missed %d (for big numbers: at least) packet(s) cpi=%d(size=%d); lpi=%d(size=%d) remotepeer=%s:%d\n", tmp, cpi, ret_recv, lpi, ret_recv_lpi, (char*)platformIPAddrConvert(ipfrom),portfrom));
01394 }
01395 ret_recv_lpi = ret_recv;
01396
01397
01398 bool success = m_PeerList.setReceivePacketIndex( ipfrom, portfrom, m_temp->internalPacketIndex);
01399 assert( success == true );
01400 success=true;
01401 }
01402 }
01403
01404 if (m_temp->isAckPacket() == true)
01405 {
01406 if (m_AckPacketWaitingQueue.find_elem_ack_and_signal( *m_temp ) == false)
01407 {
01408 NQ_DBG(("ReceiveTask: No waiting packet for ack found!"));
01409 }
01410
01411 m_stats.RcvAckPackets++;
01412
01413 putbackReference( m_temp );
01414 continue;
01415 }
01416
01417 incrementReferenceCount(m_temp);
01418
01419 if (m_temp->wantAckPacket())
01420 {
01421 m_stats.RcvNumPacketsWantAck++;
01422 }
01423
01424
01425 if (m_temp->isResponse() == true)
01426 {
01427
01428 NQ_WAITFORRESPONSE temp_wait;
01429
01430 m_stats.RcvNumResponses++;
01431
01432 bool found_evt = m_ResponseWaitQueue.find_and_remove_elem(m_temp->internalRequestID, temp_wait);
01433 if (found_evt == true)
01434 {
01435 if (m_ReceiveResponseSemaphorePacketQueue.push_back( m_temp, error ) == true)
01436 {
01437 m_stats.RcvResponsesWithSemaphore++;
01438 platformSignalSemaphore(temp_wait.semaphore_to_signal);
01439 }
01440 else
01441 {
01442 putbackReference(m_temp);
01443 if (error == 2)
01444 m_stats.RcvDroppedPacketsQueueFull++;
01445 else
01446 m_stats.RcvQueueErrors++;
01447 }
01448 }
01449 else
01450 {
01451 if (m_ReceivePacketQueue.push_back( m_temp, error ) == true)
01452 {
01453 m_stats.RcvResponsesWithoutSemaphore++;
01454
01455 platformSignalSemaphore( m_receiveSemaphore );
01456 }
01457 else
01458 {
01459 putbackReference(m_temp);
01460 if (error == 2)
01461 m_stats.RcvDroppedPacketsQueueFull++;
01462 else
01463 m_stats.RcvQueueErrors++;
01464 }
01465 }
01466
01467 }
01468 else
01469 {
01470
01471 if (m_ReceivePacketQueue.push_back( m_temp, error ) == true)
01472 {
01473 if (m_temp->isTelegram() == true) m_stats.RcvNumTelegrams++;
01474 else if (m_temp->isRequest() == true) m_stats.RcvNumRequests++;
01475 else assert(33==34);
01476
01477 platformSignalSemaphore( m_receiveSemaphore );
01478 }
01479 else
01480 {
01481 putbackReference(m_temp);
01482 if (error == 2)
01483 m_stats.RcvDroppedPacketsQueueFull++;
01484 else
01485 m_stats.RcvQueueErrors++;
01486 }
01487
01488 }
01489
01490 putbackReference(m_temp);
01491
01492 }
01493 else
01494 {
01495 if (platformSocketGetLastError() != EAGAIN) m_stats.RcvSocketErrors++;
01496 else NQ_DBG(("ReceiveTask: Eagain.\n"));
01497 }
01498 }
01499 else if (ret_select == SOCKET_ERROR) m_stats.RcvSelectErrors++;
01500
01501
01502 }
01503 m_bReceiveThreadRunning = false;
01504 NQ_DBG(("ReceiveTask: At the very end of task.\n"));
01505 }
01506
01507
01508
01509
01510
01511
01512
01513
01514
01515
01516
01517
01518
01519
01520
01521
01522
01523