00001 #include "Platform.h"
00002 #include "Declarations.h"
00003 #include "PacketQueue.h"
00004 #include "MessageBuffers.h"
00005 #include "ResponseWaitQueue.h"
00006 #include "PeerList.h"
00007 #include "NetworkQueue.h"
00008 #include <assert.h>
00009 #include <time.h>
00010
00011 #define ERR(a) \
00012 { errcode = a; \
00013 goto err;}
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053 CNetworkQueue::CNetworkQueue()
00054 {
00055 m_socket = INVALID_SOCKET;
00056 m_iLastError = ERR_NQ_NONE;
00057 m_iLastSystemError = 0;
00058 m_bSuccessfulInitialized = false;
00059
00060 m_AckPacketWaitingQueue.setQueueMaxsize(ACKQUEUE_SIZE);
00061 m_SendPacketQueue.setQueueMaxsize(SENDQUEUE_SIZE);
00062 m_ReceivePacketQueue.setQueueMaxsize(RECVQUEUE_SIZE);
00063 }
00064
00065
00066
00067
00068
00069
00070 CNetworkQueue::~CNetworkQueue()
00071 {
00072 if (m_bSuccessfulInitialized == true)
00073 {
00074
00075 close();
00076 }
00077 }
00078
00079
00080
00081
00082
00083
00084
00085
00091 CMessage *CNetworkQueue::createMsg()
00092 {
00093 return obtainReference();
00094 }
00095
00096 bool CNetworkQueue::freeMsg( CMessage *ref)
00097 {
00098 return putbackReference( ref );
00099 }
00100
00101
00102
00103 CMessage *CNetworkQueue::obtainReference()
00104 {
00105 assert( m_bSuccessfulInitialized == true );
00106
00107 return( bufs.acquire_reference() );
00108 }
00109
00110 bool CNetworkQueue::incrementReferenceCount( CMessage *ref )
00111 {
00112 assert( m_bSuccessfulInitialized == true );
00113
00114 return( bufs.increment_referencecount(ref) );
00115 }
00116
00117
00118 bool CNetworkQueue::putbackReference( CMessage *ref)
00119 {
00120 assert( m_bSuccessfulInitialized == true );
00121
00122 return( bufs.putback_reference(ref) );
00123 }
00124
00125
00126
00127
00141 bool CNetworkQueue::addPeer(UINT32 ip_hostorder, UINT32 port_hostorder)
00142 {
00143 assert( m_bSuccessfulInitialized == true );
00144
00145
00146 return m_PeerList.add(ip_hostorder, port_hostorder);
00147 }
00148
00149
00150
00151
00166 bool CNetworkQueue::removePeer(UINT32 ip_hostorder, UINT32 port_hostorder)
00167 {
00168 assert( m_bSuccessfulInitialized == true );
00169
00170 return m_PeerList.remove(ip_hostorder, port_hostorder);
00171 }
00172
00173
00174
00175
00210 bool CNetworkQueue::create( const ONECHAR *ip_remote_peer, UINT16 portnumber_remote, UINT16 portnumber_local )
00211 {
00212 if (m_bSuccessfulInitialized == true)
00213 {
00214
00215 m_iLastError = ERR_NQ_ALREADY;
00216 return false;
00217 }
00218
00219 INT32 errcode;
00220 INT32 ret;
00221 struct sockaddr_in LOC;
00222
00223 m_sendSemaphore = NULL;
00224 m_receiveSemaphore = NULL;
00225 m_hSendThread = NULL;
00226 m_hReceiveThread = NULL;
00227
00228 if (bufs.Initialize(NUM_CMESSAGEBUFFERS) == false) ERR(ERR_NQ_BADPOOL);
00229
00230 m_PeerList.removeAll();
00231
00232 if ((ip_remote_peer != NULL))
00233 {
00234 if (strlen(ip_remote_peer)!=0)
00235 {
00236 UINT32 ip_net = platformIPAddrConvert( ip_remote_peer );
00237
00238 if (ip_net == INADDR_NONE) ERR(ERR_NQ_BADIP);
00239 if (m_PeerList.add(ntohl(ip_net), portnumber_remote) == false) ERR(ERR_NQ_ADDPEER);
00240 }
00241 }
00242
00243 m_iNextRequestId = 1;
00244
00245
00246
00247 LOC.sin_family = AF_INET;
00248 LOC.sin_port = htons( portnumber_local );
00249 LOC.sin_addr.s_addr = INADDR_ANY;
00250
00251 m_socket = platformSocketCreateInet(SOCK_DGRAM);
00252 if (m_socket == SOCKET_ERROR) ERR(ERR_NQ_SOCREATE);
00253
00254 ret = platformSocketBind(m_socket, (const struct sockaddr *) &LOC, sizeof(LOC));
00255 if (ret == SOCKET_ERROR) ERR(ERR_NQ_SOBIND);
00256
00257
00258 ret = platformSocketSetNonblocking( m_socket );
00259 if (ret == SOCKET_ERROR) ERR(ERR_NQ_SONONBLOCK);
00260
00261
00262 memset( (void *) &m_stats, 0, sizeof(m_stats) );
00263
00264 m_stats.StartTime.tv_sec = time(NULL);
00265 m_stats.StartTime.tv_usec = 0;
00266
00267
00268
00269 m_sendSemaphore = platformCreateSemaphore(SENDQUEUE_SIZE);
00270 if (m_sendSemaphore == NULL) ERR(ERR_NQ_SEMCREATESEND);
00271
00272 m_receiveSemaphore = platformCreateSemaphore(RECVQUEUE_SIZE);
00273 if (m_receiveSemaphore == NULL) ERR(ERR_NQ_SEMCREATERECV);
00274
00275
00276 m_bShutdownSendThread = false;
00277 m_bSendThreadRunning = false;
00278
00279 m_hSendThread = platformCreateThread(true, this, SEND_THREAD_PRIORITY);
00280 if (m_hSendThread == NULL) ERR(ERR_NQ_THRDCREATESEND);
00281
00282 m_bShutdownReceiveThread = false;
00283 m_bReceiveThreadRunning = false;
00284
00285 m_hReceiveThread = platformCreateThread(false, this, RECV_THREAD_PRIORITY);
00286 if (m_hReceiveThread == NULL) ERR(ERR_NQ_THRDCREATERECV);
00287
00288 m_bSuccessfulInitialized = true;
00289
00290 return true;
00291
00292 err:
00293 _DEBUG_(printf("NQ::create: in error stream: errcode=%d\n",errcode));
00294 m_iLastError = (unsigned int) errcode;
00295
00296 if ((m_iLastError == ERR_NQ_SOCREATE) ||
00297 (m_iLastError == ERR_NQ_SOBIND) ||
00298 (m_iLastError == ERR_NQ_SONONBLOCK)) m_iLastSystemError = platformSocketGetLastError();
00299
00300 if (m_iLastError > ERR_NQ_SONONBLOCK) m_iLastSystemError = platformGetLastError();
00301
00302
00303 if (m_iLastError > ERR_NQ_THRDCREATESEND)
00304 {
00305
00306 m_bShutdownSendThread = true;
00307 while( m_bSendThreadRunning == true)
00308 {
00309 platformSleepMilliseconds(100);
00310 }
00311
00312 platformDeleteThread( m_hSendThread );
00313 }
00314
00315 if (m_iLastError > ERR_NQ_SEMCREATERECV)
00316 if (m_receiveSemaphore != NULL) platformDeleteSemaphore( m_receiveSemaphore );
00317
00318 if (m_iLastError > ERR_NQ_SEMCREATESEND)
00319 if (m_sendSemaphore != NULL) platformDeleteSemaphore( m_sendSemaphore);
00320
00321 if (m_iLastError > ERR_NQ_SOCREATE)
00322 {
00323 ret = platformSocketClose(m_socket);
00324 m_socket = INVALID_SOCKET;
00325 }
00326
00327 if (m_iLastError > ERR_NQ_BADPOOL)
00328 {
00329 bufs.Deinitialize();
00330 }
00331
00332
00333 return false;
00334 }
00335
00336
00337
00338
00339
00368 bool CNetworkQueue::close()
00369 {
00370 INT32 ret;
00371 INT32 errcode;
00372
00373 if (m_bSuccessfulInitialized == true)
00374 {
00375 m_bShutdownSendThread = true;
00376 m_bShutdownReceiveThread = true;
00377
00378 _DEBUG_(printf("Waiting for send and receive threads to shut down...\n"));
00379
00380 while( m_bSendThreadRunning == true)
00381 {
00382 platformSleepMilliseconds(100);
00383 }
00384
00385 while( m_bReceiveThreadRunning == true)
00386 {
00387 platformSleepMilliseconds(100);
00388 }
00389 _DEBUG_(printf("Success. Send and receive threads are shut down.\n"));
00390
00391 if (platformDeleteThread( m_hSendThread ) == false) ERR(ERR_NQ_THRDCLEANUP);
00392 if (platformDeleteThread( m_hReceiveThread ) == false) ERR(ERR_NQ_THRDCLEANUP);
00393
00394 ret = platformSocketClose(m_socket);
00395 if (ret == SOCKET_ERROR) ERR(ERR_NQ_SOCLOSE);
00396 m_socket = INVALID_SOCKET;
00397
00398
00399
00400 int cnt = m_SendPacketQueue.size();
00401 int i;
00402
00403 for (i=0;i<cnt;i++)
00404 {
00405 CMessage *tmp;
00406
00407 tmp = NULL;
00408 if (m_SendPacketQueue.pop_front(&tmp) == true)
00409 {
00410 putbackReference(tmp);
00411 }
00412 }
00413
00414 cnt = m_ReceivePacketQueue.size();
00415 for (i=0;i<cnt;i++)
00416 {
00417 CMessage *tmp;
00418
00419 tmp = NULL;
00420 if (m_ReceivePacketQueue.pop_front(&tmp) == true)
00421 {
00422 putbackReference(tmp);
00423 }
00424 }
00425
00426 cnt = m_ReceiveResponseSemaphorePacketQueue.size();
00427 for (i=0;i<cnt;i++)
00428 {
00429 CMessage *tmp;
00430
00431 tmp = NULL;
00432 if (m_ReceiveResponseSemaphorePacketQueue.pop_front(&tmp) == true)
00433 {
00434 putbackReference(tmp);
00435 }
00436 }
00437
00438
00439 if (m_sendSemaphore != NULL)
00440 if (platformDeleteSemaphore( m_sendSemaphore ) == false) ERR(ERR_NQ_SEMCLEANUP);
00441 if (m_receiveSemaphore != NULL)
00442 if (platformDeleteSemaphore( m_receiveSemaphore ) == false) ERR(ERR_NQ_SEMCLEANUP);
00443
00444 bufs.Deinitialize();
00445
00446 m_bSuccessfulInitialized = false;
00447
00448 }
00449 return true;
00450
00451 err:
00452 m_iLastError = (unsigned int) errcode;
00453 if (m_iLastError == ERR_NQ_SOCLOSE)
00454 m_iLastSystemError = platformSocketGetLastError();
00455 else
00456 m_iLastSystemError = platformGetLastError();
00457
00458 return false;
00459 }
00460
00461
00462
00463
00464
00465
00466
00467
00468
00469
00470
00471
00472
00473
00474
00475
00476
00477
00478
00479 bool CNetworkQueue::sendMsgTo( CMessage *msg, UINT32 ip_hostorder, UINT16 port_hostorder)
00480 {
00481
00482 msg->internalPacketSpecialBits &= ~PACKET_BIT_WANT_ACK;
00483
00484 return sendMsgToInternal( ip_hostorder, port_hostorder, msg, NO_SEMAPHORE);
00485 }
00486
00487
00488
00489
00528 bool CNetworkQueue::sendMsgTo( CMessage *msg, UINT32 ip_hostorder, UINT16 port_hostorder, const PI_TIMEVAL &timeout)
00529 {
00530 INT32 errcode;
00531 UINT32 msecs = timeout.tv_sec * 1000 + (timeout.tv_usec / 1000);
00532 bool retval;
00533 HANDLE ack_sem=NULL;
00534 bool bAckQueueRemovedTimedoutMessage;
00535
00536 assert( m_bSuccessfulInitialized == true );
00537 assert( msg != NULL );
00538
00539
00540 msg->internalPacketSpecialBits |= PACKET_BIT_WANT_ACK;
00541
00542 if (msg->check_int() != 0) ERR(ERR_NQ_MSGINVALID);
00543
00544 if (m_PeerList.isAllowed(ip_hostorder, port_hostorder) == false) ERR(ERR_NQ_INVALIDPEER);
00545
00546 if (msg->isRequest() == true)
00547 {
00548 msg->internalRequestID = m_iNextRequestId++;
00549 }
00550
00551 msg->message_sendto_ip_host = ip_hostorder;
00552 msg->message_sendto_port_host = port_hostorder;
00553
00554 ack_sem = platformCreateSemaphore(1);
00555 if (ack_sem == NULL) ERR(ERR_NQ_SEMCREATE);
00556 msg->notifySemaphoreOnReceiveAck = ack_sem;
00557
00558 incrementReferenceCount(msg);
00559 if (m_AckPacketWaitingQueue.push_back(msg) == false)
00560 {
00561 putbackReference(msg);
00562 ERR(ERR_NQ_ACKQUEUE);
00563 }
00564
00565
00566 incrementReferenceCount(msg);
00567 UINT32 error;
00568 if (m_SendPacketQueue.push_back( msg, error) == false)
00569 {
00570 putbackReference(msg);
00571 if (error == ERR_PQ_FULL)
00572 {
00573 m_stats.SntQueueFull++;
00574 ERR(ERR_NQ_SENDQUEUEFULL);
00575 }
00576 else
00577 {
00578 _DEBUG_(printf("SendMessageToBlocking: Sendqueue error: %d\n",error));
00579 m_stats.SntQueueErrors++;
00580 ERR(ERR_NQ_SENDQUEUE);
00581 }
00582 }
00583
00584 if (platformSignalSemaphore( m_sendSemaphore ) == false) ERR(ERR_NQ_SEMNOTIFY);
00585
00586 if (platformWaitForSemaphore(ack_sem, msecs) == true) retval = true; else retval = false;
00587
00588
00589
00590 bAckQueueRemovedTimedoutMessage = m_AckPacketWaitingQueue.find_and_remove_passed_elem( msg );
00591 assert(bAckQueueRemovedTimedoutMessage == true);
00592 putbackReference( msg );
00593
00594 if (retval == false)
00595 {
00596 m_stats.AckTimeout++;
00597 ERR(ERR_NQ_TIMEOUT);
00598 }
00599
00600 if (platformDeleteSemaphore( ack_sem ) == false) ERR(ERR_NQ_SEMDELETE);
00601
00602 return retval;
00603
00604 err:
00605 if ((errcode == ERR_NQ_SENDQUEUE) || (errcode == ERR_NQ_SENDQUEUEFULL))
00606 {
00607 if (m_AckPacketWaitingQueue.find_and_remove_passed_elem( msg ) == true)
00608 {
00609 putbackReference(msg);
00610 }
00611 }
00612
00613 if ((errcode == ERR_NQ_SENDQUEUEFULL) || (errcode == ERR_NQ_SENDQUEUE) || (errcode == ERR_NQ_ACKQUEUE) || (errcode == ERR_NQ_TIMEOUT) )
00614 {
00615 if (ack_sem != NULL) platformDeleteSemaphore( ack_sem );
00616 }
00617
00618 if (errcode != ERR_NQ_NONE) m_iLastError = (unsigned int) errcode;
00619
00620 return false;
00621 }
00622
00623
00624
00625
00626
00656 bool CNetworkQueue::sendMsgToInternal( UINT32 ip_hostorder, UINT16 port_hostorder, CMessage *msg, HANDLE recv_notify_sem)
00657 {
00658 INT32 errcode;
00659
00660 assert( m_bSuccessfulInitialized == true );
00661 assert( msg != NULL );
00662
00663 if (msg->check_int() != 0) ERR(ERR_NQ_MSGINVALID);
00664 if (m_PeerList.isAllowed(ip_hostorder, port_hostorder) == false) ERR(ERR_NQ_INVALIDPEER);
00665
00666
00667 msg->message_sendto_ip_host = ip_hostorder;
00668 msg->message_sendto_port_host = port_hostorder;
00669
00670 incrementReferenceCount(msg);
00671 UINT32 error;
00672 if (m_SendPacketQueue.push_back( msg, error) == false)
00673 {
00674 putbackReference(msg);
00675 if (error == ERR_PQ_FULL)
00676 {
00677 m_stats.SntQueueFull++;
00678 ERR(ERR_NQ_SENDQUEUEFULL);
00679 }
00680 else
00681 {
00682 _DEBUG_(printf("SendMessageInternal(): Sendqueue error: %d\n",error));
00683 m_stats.SntQueueErrors++;
00684 ERR(ERR_NQ_SENDQUEUE);
00685 }
00686 }
00687
00688 if (platformSignalSemaphore( m_sendSemaphore ) == false) ERR(ERR_NQ_SEMNOTIFY);
00689
00690 return true;
00691 err:
00692 m_iLastError = (unsigned int) errcode;
00693
00694 return false;
00695 }
00696
00697
00698
00699
00732 bool CNetworkQueue::recvMsg( CMessage **newmsg )
00733 {
00734 bool ret;
00735 INT32 errcode=0;
00736 UINT32 lasterr;
00737
00738 assert( m_bSuccessfulInitialized == true );
00739 assert( newmsg != NULL );
00740
00741 if (platformIsSemaphoreSignalled( m_receiveSemaphore ) == true)
00742 {
00743 ret = m_ReceivePacketQueue.pop_front( newmsg, lasterr );
00744 if (ret == false)
00745 {
00746 if (lasterr == ERR_PQ_NONE)
00747 {
00748 assert(26==34);
00749 ERR(ERR_NQ_EMPTY);
00750 }
00751 ERR(ERR_NQ_RECVQUEUE);
00752 }
00753 if ((*newmsg)->wantAckPacket() == true)
00754 {
00755 CMessage *ack = obtainReference();
00756 if (ack != NULL)
00757 {
00758 UINT32 ip_hostorder = (*newmsg)->message_came_from_ip;
00759 UINT16 port_hostorder = (*newmsg)->message_came_from_port;
00760
00761 ack->constructAckPacket( **newmsg );
00762
00763 if (sendMsgToInternal( ip_hostorder, port_hostorder, ack, NO_SEMAPHORE) == false)
00764 {
00765 _DEBUG_(printf("SendMessageinternal (for ack packet) failed (1).\n"));
00766 }
00767
00768 putbackReference(ack);
00769 }
00770 else assert( ack != NULL );
00771 }
00772
00773 return true;
00774 }
00775 ERR(ERR_NQ_EMPTY);
00776
00777 err:
00778 m_iLastError = (unsigned int) errcode;
00779
00780 return false;
00781 }
00782
00783
00784
00785
00824 bool CNetworkQueue::recvMsg( CMessage **newmsg, const PI_TIMEVAL &timeout )
00825 {
00826 UINT32 timeout_msec;
00827 INT32 errcode=0;
00828
00829 assert( m_bSuccessfulInitialized == true );
00830 assert( newmsg != NULL );
00831
00832 timeout_msec = timeout.tv_sec * 1000 + (timeout.tv_usec / 1000);
00833
00834 if (platformWaitForSemaphore( m_receiveSemaphore, timeout_msec ) == true)
00835 {
00836 UINT32 lasterr;
00837 bool ret = m_ReceivePacketQueue.pop_front( newmsg, lasterr );
00838 if (ret == false)
00839 {
00840 if (lasterr == 0)
00841 {
00842 assert(26==34);
00843 ERR(ERR_NQ_EMPTY);
00844 }
00845 ERR(ERR_NQ_RECVQUEUE);
00846 }
00847
00848 if ((*newmsg)->wantAckPacket() == true)
00849 {
00850 CMessage *ack = obtainReference();
00851 if (ack != NULL)
00852 {
00853 UINT32 ip_hostorder = (*newmsg)->message_came_from_ip;
00854 UINT16 port_hostorder = (*newmsg)->message_came_from_port;
00855
00856 ack->constructAckPacket( **newmsg );
00857
00858 if (sendMsgToInternal(ip_hostorder, port_hostorder, ack, NO_SEMAPHORE) == false)
00859 {
00860 _DEBUG_(printf("SendMessageinternal (for ack packet) failed (2).\n"));
00861 }
00862
00863 putbackReference(ack);
00864 }
00865 else assert( ack != NULL );
00866 }
00867
00868 return true;
00869 }
00870 else
00871 m_iLastError = ERR_NQ_TIMEOUT;
00872
00873 return false;
00874
00875 err:
00876 m_iLastError = (unsigned int) errcode;
00877
00878 return false;
00879 }
00880
00881
00882
00883
00929 bool CNetworkQueue::sendRecvMsg(CMessage **response, CMessage *request, UINT32 ip_hostorder, UINT16 port_hostorder, const PI_TIMEVAL &timeout)
00930 {
00931 bool retval = false;
00932
00933 INT32 errcode;
00934 UINT32 msecs = timeout.tv_sec * 1000 + (timeout.tv_usec / 1000);
00935 CMessage *l_response;
00936 bool pushToResponseWaitQueue;
00937
00938 assert( m_bSuccessfulInitialized == true );
00939 assert( request != NULL );
00940 assert( response != NULL );
00941
00942 if (request->isRequest() == false) return false;
00943
00944 request->internalRequestID = m_iNextRequestId++;
00945
00946 HANDLE recv_sem = platformCreateSemaphore(1);
00947 if (recv_sem == NULL) ERR(ERR_NQ_SEMCREATE);
00948
00949
00950 WAITFORRESPONSE resp;
00951 resp.semaphore_to_signal = recv_sem;
00952 resp.requestid = request->internalRequestID;
00953
00954 pushToResponseWaitQueue = m_ResponseWaitQueue.push_back( resp );
00955 if ( pushToResponseWaitQueue == false) ERR(ERR_NQ_RESPWAITQUEUE);
00956
00957
00958 if (sendMsgToInternal(ip, port, request, recv_sem) == false) ERR(0);
00959
00960 m_stats.BlockingCallRequestSent++;
00961
00962 if (platformWaitForSemaphore(recv_sem, msecs) == true)
00963 {
00964
00965 bool success = m_ReceiveResponseSemaphorePacketQueue.find_and_remove_elem(request->internalRequestID, response);
00966 if (success == true)
00967 {
00968 m_stats.BlockingCallResponseReceivedOnTime++;
00969 retval = true;
00970 }
00971 else ERR(ERR_NQ_RESPTROUBLE);
00972 }
00973 else
00974 {
00975
00976 bool killed = m_ResponseWaitQueue.remove_elem( request->internalRequestID );
00977 if (killed == false)
00978 {
00979
00980
00981
00982 bool success = m_ReceiveResponseSemaphorePacketQueue.find_and_remove_elem(request->internalRequestID, &l_response);
00983
00984 _DEBUG_(if (success == true) printf("Response came just a bit too late!\n"); else printf("Could not remove.\n"););
00985
00986 success=true;
00987
00988 }
00989 m_stats.BlockingCallResponseTimeout++;
00990 ERR(ERR_NQ_TIMEOUT);
00991
00992 }
00993
00994 if (platformDeleteSemaphore( recv_sem ) == false) ERR(ERR_NQ_SEMDELETE);
00995
00996 return retval;
00997
00998 err:
00999 if (errcode == ERR_NQ_NONE)
01000 {
01001 m_ResponseWaitQueue.remove_elem( request->internalRequestID );
01002 }
01003
01004 if ((errcode == ERR_NQ_TIMEOUT) || (errcode == ERR_NQ_RESPTROUBLE) || (errcode == ERR_NQ_RESPWAITQUEUE) || (errcode == ERR_NQ_NONE) )
01005 {
01006 if (recv_sem != NULL) platformDeleteSemaphore( recv_sem );
01007 }
01008
01009 if (errcode != ERR_NQ_NONE) m_iLastError = (unsigned int) errcode;
01010
01011 return false;
01012 }
01013
01014
01015
01030 bool CNetworkQueue::getStatistics( STATISTICS &stats )
01031 {
01032 stats = m_stats;
01033 return true;
01034 }
01035
01036
01037
01042
01043
01044 UINT32 CNetworkQueue::getLastError()
01045 {
01046 return m_iLastError;
01047 }
01048
01066 bool CNetworkQueue::getLastErrorString( ONECHAR *buffer, UINT32 buflength)
01067 {
01068
01069 ONECHAR os_additional_error[300];
01070 static ONECHAR last_error_buffer[200];
01071 static const ONECHAR classname[]="CNetworkQueue";
01072
01073 static const ONECHAR clserrors[ERR_NQ_NUMERRORS][72]=
01074 {
01075 "No error.",
01076 "The class was already initialized before (at InitializeQueue).",
01077 "Error creating CMessage pool (CMessageBuffers) at InitializeQueue..",
01078 "The dotted IP address is invalid at InitializeQueue.",
01079 "Could not create udp socket at InitializeQueue().",
01080 "Could not bind udp socket at InitializeQueue().",
01081 "Could not add peer to list (on InitializeQueue).",
01082 "Error putting socket into nonblocking mode at InitializeQueue().",
01083 "Could not create Send semaphore at InitializeQueue().",
01084 "Could not create Receive semaphore at InitializeQueue().",
01085 "Could not start send thread/task at InitializeQueue().",
01086 "Could not start receive thread/task at InitializeQueue().",
01087 "Could not cleanup tasks/threads at DeinitializeQueue().",
01088 "Could not close socket at DeinitializeQueue().",
01089 "Could not cleanup semaphores at DeinitializeQueue().",
01090 "The packet to be sent is not valid (SendMessage()).",
01091 "The sent queue is filled up at the moment (SendMessage()).",
01092 "Pushing the packet to send queue failed (SendMessage()).",
01093 "Semaphore could not be signalled (SendMessage()).",
01094 "Queue was empty (isNewMessageAvailable()).",
01095 "Error getting message out of nonempty queue (isNewMessageAvailable()).",
01096 "Timeout happened (isNewMessageAvailable or SendRequestAndWaitForResp.)",
01097 "Error creating temp. semaphore (Request/Response blocking).",
01098 "Through semaphore announced response was not found in dedicated queue!",
01099 "Temporary semaphore could not be deleted (Request/Response blocking).",
01100 "Invalid remote peer: (ip, port) is not contained in peer list.",
01101 "Internal error in 'ack' queue.",
01102 "Internal error in 'response wait' queue."
01103
01104 };
01105
01106 last_error_buffer[0] = 0;
01107 os_additional_error[0] = 0;
01108
01109 assert( buffer != NULL );
01110 assert( m_iLastError < ERR_NQ_NUMERRORS);
01111
01112 if ((m_iLastError == ERR_NQ_SOCREATE) ||
01113 (m_iLastError == ERR_NQ_SOBIND) ||
01114 (m_iLastError == ERR_NQ_SONONBLOCK) ||
01115 (m_iLastError == ERR_NQ_SOCLOSE))
01116 {
01117 sprintf((char *) last_error_buffer, "%s Error: %s\n(sys error was: %d -> \"%s\")",
01118 classname, clserrors[m_iLastError-ERR_NQ_BASE], m_iLastSystemError,
01119 platformGetOSErrorString(m_iLastSystemError, os_additional_error,
01120 sizeof(os_additional_error)) );
01121 }
01122 else
01123 {
01124 if (m_iLastError >= ERR_NQ_BASE && m_iLastError < ERR_NQ_NUMERRORS)
01125 {
01126
01127 sprintf((char *) last_error_buffer, "%s Error: %s", classname, clserrors[m_iLastError-ERR_NQ_BASE] );
01128 }
01129 else
01130 {
01131
01132 sprintf((char *) last_error_buffer, "%s Error: %s", classname, "Unknown internal error." );
01133 }
01134 }
01135
01136 UINT32 total_length = strlen((char *)last_error_buffer)+1;
01137 if (total_length > buflength) return false;
01138 memcpy(buffer, last_error_buffer, total_length);
01139
01140 return true;
01141 }
01142
01143
01144
01145
01146
01147
01148
01149
01150
01151 void CNetworkQueue::SendProcessTask()
01152 {
01153 UINT8 initial_buffer[MAX_PACKET_SIZE];
01154 CMessage *m_temp;
01155
01156 _DEBUG_(printf("SendTask: Start!\n"));
01157
01158 m_bSendThreadRunning = true;
01159 while( 1 == 1)
01160 {
01161
01162
01163
01164 bool ret = platformWaitForSemaphore(m_sendSemaphore, SEND_SEMAPHORE_TIMEOUT_MS);
01165
01166 if (ret == true)
01167 {
01168
01169
01170 bool ret_get = m_SendPacketQueue.pop_front(&m_temp);
01171 if (ret_get == false)
01172 {
01173 m_stats.SntQueueErrors++;
01174 continue;
01175 }
01176
01177 INT32 message_size = 0;
01178
01179
01180 bool success = m_PeerList.getSendPacketIndex(m_temp->message_sendto_ip_host,m_temp->message_sendto_port_host, m_temp->internalPacketIndex);
01181 assert( success == true);
01182 success = true;
01183
01184
01185 message_size = MAX_PACKET_SIZE;
01186 bool ret_enc = m_temp->ExportMessageToPacketBuffer(initial_buffer, message_size);
01187
01188
01189 if (ret_enc == false)
01190 {
01191 _DEBUG_(printf("SendTask: Encoding failed.\n"));
01192 m_stats.SntEncodingErrors++;
01193 putbackReference(m_temp);
01194 continue;
01195 }
01196
01197 INT32 still_to_write = message_size;
01198 UINT8 *bufptr = initial_buffer;
01199 bool abort=false;
01200
01201
01202
01203
01204
01205 INT32 accumsockettimeouts = 0;
01206
01207 while (abort == false)
01208 {
01209 struct timeval tmout;
01210 fd_set fdwset;
01211
01212 FD_ZERO( &fdwset );
01213 FD_SET( m_socket, &fdwset);
01214
01215 tmout.tv_sec = SEND_SELECT_TIMEOUT_MS / 1000;
01216 tmout.tv_usec = (SEND_SELECT_TIMEOUT_MS % 1000)*1000;
01217
01218 INT32 ret_select = platformSocketSelect(platformMaxFdSets, NULL, &fdwset, NULL, &tmout);
01219 if (ret_select != 1)
01220 {
01221 accumsockettimeouts++;
01222 if (accumsockettimeouts > 10)
01223 {
01224 abort = true;
01225 m_stats.SntSocketErrors++;
01226 }
01227 }
01228
01229
01230 INT32 ret_send = platformSocketSendToFunction(m_socket, bufptr, still_to_write,m_temp->message_sendto_ip_host,m_temp->message_sendto_port_host);
01231
01232 if (ret_send >= 0)
01233 {
01234 still_to_write -= ret_send;
01235 bufptr += ret_send;
01236
01237 if (still_to_write <= 0) abort = true;
01238 }
01239 else
01240 {
01241 if (ret_send == SOCKET_ERROR)
01242 {
01243 if (platformSocketGetLastError() != EAGAIN)
01244 {
01245 abort = true;
01246 m_stats.SntSocketErrors++;
01247 }
01248 }
01249 }
01250 }
01251
01252 if (still_to_write != 0)
01253 {
01254
01255
01256
01257
01258
01259
01260
01261
01262
01263
01264
01265
01266
01267 }
01268 else
01269 {
01270 m_PeerList.incSendPacketIndex(m_temp->message_sendto_ip_host,m_temp->message_sendto_port_host);
01271
01272 if (m_temp->isAckPacket() == true) m_stats.SntAckPackets++;
01273
01274 else if (m_temp->isTelegram() == true) m_stats.SntTelegrams++;
01275 else if (m_temp->isRequest() == true) m_stats.SntRequests++;
01276 else m_stats.SntResponses++;
01277
01278 }
01279 putbackReference(m_temp);
01280 }
01281
01282 if (m_bShutdownSendThread == true) break;
01283
01284 }
01285 m_bSendThreadRunning = false;
01286 _DEBUG_(printf("SendTask: At the very end of task.\n"));
01287 }
01288
01289
01290
01291
01292
01293
01294
01295
01296
01297
01298
01299 void CNetworkQueue::ReceiveProcessTask()
01300 {
01301 UINT8 initial_buffer[MAX_PACKET_SIZE];
01302 CMessage *m_temp;
01303 UINT32 error;
01304 UINT32 ipfrom;
01305 UINT16 portfrom;
01306
01307 _DEBUG_(printf("ReceiveTask: Start!\n"));
01308
01309 m_bReceiveThreadRunning = true;
01310 while( 1 == 1)
01311 {
01312 struct timeval tmout;
01313 fd_set fdrset;
01314
01315 FD_ZERO( &fdrset );
01316 FD_SET( m_socket, &fdrset);
01317
01318 tmout.tv_sec = RECV_SELECT_TIMEOUT_MS / 1000;
01319 tmout.tv_usec = (RECV_SELECT_TIMEOUT_MS % 1000)*1000;
01320
01321 INT32 ret_select = platformSocketSelect(platformMaxFdSets, &fdrset, NULL, NULL, &tmout);
01322 if (m_bShutdownReceiveThread == true) break;
01323
01324 if (ret_select == 1)
01325 {
01326 INT32 ret_recv = platformSocketReceiveFromFunction(m_socket, initial_buffer, MAX_PACKET_SIZE, ipfrom, portfrom);
01327 if (ret_recv != SOCKET_ERROR)
01328 {
01329 if (ret_recv == 0)
01330 {
01331
01332 m_stats.RcvNoDataReceived++;
01333 continue;
01334 }
01335
01336 if (m_PeerList.isAllowed(ipfrom, portfrom) == false)
01337 {
01338 m_stats.RcvInvalidPeerAddress++;
01339 continue;
01340 }
01341
01342 m_temp = obtainReference();
01343 if (m_temp == NULL)
01344 {
01345 m_stats.RcvNoBuffersLeft++;
01346 continue;
01347 }
01348
01349 bool valid = m_temp->ImportMessageFromPacketBuffer(initial_buffer, ret_recv, ipfrom, portfrom );
01350 if (valid == false)
01351 {
01352 m_stats.RcvDroppedPacketsBadChecksum++;
01353 putbackReference(m_temp);
01354 continue;
01355 }
01356
01357
01358
01359
01360 {
01361 UINT16 last_packet_index;
01362 static UINT16 ret_recv_lpi;
01363
01364 if (m_PeerList.getReceivePacketIndex( ipfrom, portfrom, last_packet_index) == false)
01365 {
01366 bool success = m_PeerList.setReceivePacketIndex( ipfrom, portfrom, m_temp->internalPacketIndex);
01367 assert( success == true );
01368 success=true;
01369 ret_recv_lpi = ret_recv;
01370
01371 }
01372 else
01373 {
01374 INT32 lpi = (INT32) last_packet_index;
01375 INT32 cpi = (INT32) m_temp->internalPacketIndex;
01376 INT32 tmp;
01377
01378 tmp = ((cpi-lpi)&0xffff)-1;
01379 assert( tmp >= 0 );
01380
01381 if (tmp != 0)
01382 {
01383 m_stats.RcvPacketsMissedByIndex += tmp;
01384 _DEBUG_(printf("ReceiveTask: Missed %d (for big numbers: at least) packet(s) cpi=%d(size=%d); lpi=%d(size=%d) remotepeer=%s:%d\n", tmp, cpi, ret_recv, lpi, ret_recv_lpi, (char*)platformIPAddrConvert(ipfrom),portfrom));
01385 }
01386 ret_recv_lpi = ret_recv;
01387
01388
01389 bool success = m_PeerList.setReceivePacketIndex( ipfrom, portfrom, m_temp->internalPacketIndex);
01390 assert( success == true );
01391 success=true;
01392 }
01393 }
01394
01395 if (m_temp->isAckPacket() == true)
01396 {
01397 if (m_AckPacketWaitingQueue.find_elem_ack_and_signal( *m_temp ) == false)
01398 {
01399 _DEBUG_(printf("ReceiveTask: No waiting packet for ack found!\n"));
01400 }
01401
01402 m_stats.RcvAckPackets++;
01403
01404 putbackReference( m_temp );
01405 continue;
01406 }
01407
01408 incrementReferenceCount(m_temp);
01409
01410 if (m_temp->wantAckPacket())
01411 {
01412 m_stats.RcvNumPacketsWantAck++;
01413 }
01414
01415
01416 if (m_temp->isResponse() == true)
01417 {
01418
01419 WAITFORRESPONSE temp_wait;
01420
01421 m_stats.RcvNumResponses++;
01422
01423 bool found_evt = m_ResponseWaitQueue.find_and_remove_elem(m_temp->internalRequestID, temp_wait);
01424 if (found_evt == true)
01425 {
01426 if (m_ReceiveResponseSemaphorePacketQueue.push_back( m_temp, error ) == true)
01427 {
01428 m_stats.RcvResponsesWithSemaphore++;
01429 platformSignalSemaphore(temp_wait.semaphore_to_signal);
01430 }
01431 else
01432 {
01433 putbackReference(m_temp);
01434 if (error == 2)
01435 m_stats.RcvDroppedPacketsQueueFull++;
01436 else
01437 m_stats.RcvQueueErrors++;
01438 }
01439 }
01440 else
01441 {
01442 if (m_ReceivePacketQueue.push_back( m_temp, error ) == true)
01443 {
01444 m_stats.RcvResponsesWithoutSemaphore++;
01445
01446 platformSignalSemaphore( m_receiveSemaphore );
01447 }
01448 else
01449 {
01450 putbackReference(m_temp);
01451 if (error == 2)
01452 m_stats.RcvDroppedPacketsQueueFull++;
01453 else
01454 m_stats.RcvQueueErrors++;
01455 }
01456 }
01457
01458 }
01459 else
01460 {
01461
01462 if (m_ReceivePacketQueue.push_back( m_temp, error ) == true)
01463 {
01464 if (m_temp->isTelegram() == true) m_stats.RcvNumTelegrams++;
01465 else if (m_temp->isRequest() == true) m_stats.RcvNumRequests++;
01466 else assert(33==34);
01467
01468 platformSignalSemaphore( m_receiveSemaphore );
01469 }
01470 else
01471 {
01472 putbackReference(m_temp);
01473 if (error == 2)
01474 m_stats.RcvDroppedPacketsQueueFull++;
01475 else
01476 m_stats.RcvQueueErrors++;
01477 }
01478
01479 }
01480
01481 putbackReference(m_temp);
01482
01483 }
01484 else
01485 {
01486 if (platformSocketGetLastError() != EAGAIN) m_stats.RcvSocketErrors++;
01487 _DEBUG_(else printf("ReceiveTask: Eagain.\n"));
01488 }
01489 }
01490 else if (ret_select == SOCKET_ERROR) m_stats.RcvSelectErrors++;
01491
01492
01493 }
01494 m_bReceiveThreadRunning = false;
01495 _DEBUG_(printf("ReceiveTask: At the very end of task.\n"));
01496 }
01497
01498
01499
01500
01501
01502
01503
01504
01505
01506
01507
01508
01509
01510
01511
01512
01513