00001 #define __NQ_INTERNAL_SW__ // we need internal functions in here!
00002
00003 #include "NQ.h"
00004
00005 void printStatistics( NQ_STATISTICS stats )
00006 {
00007 printf("Statistics output:\n\n");
00008 printf("Receive Task:\n");
00009 printf(" SocketErrors\t\t\t%u\n", stats.RcvSocketErrors);
00010 printf(" SelectErrors\t\t\t%u\n", stats.RcvSelectErrors);
00011 printf(" NoDataRcvdErr\t\t\t%u\n", stats.RcvNoDataReceived);
00012 printf(" NoBuffersLeft\t\t\t%u\n", stats.RcvNoBuffersLeft);
00013 printf(" InvalidPeerAddress\t\t%u\n", stats.RcvInvalidPeerAddress);
00014 printf("\n");
00015
00016 printf(" DroppedPacketsBadChecksum\t%u\n", stats.RcvDroppedPacketsBadChecksum);
00017 printf(" PacketsMissedByIndex\t\t%u\n", stats.RcvPacketsMissedByIndex);
00018 printf(" DroppedPacketsQueueFull\t%u\n", stats.RcvDroppedPacketsQueueFull);
00019 printf(" QueueErrors\t\t\t%u\n", stats.RcvQueueErrors);
00020 printf("\n");
00021
00022 printf(" NumTelegrams\t\t\t%u\n", stats.RcvNumTelegrams);
00023 printf(" NumRequests\t\t\t%u\n", stats.RcvNumRequests);
00024 printf(" NumResponses\t\t\t%u (w_sem=%u, wo_sem=%u)\n", stats.RcvNumResponses, stats.RcvResponsesWithSemaphore, stats.RcvResponsesWithoutSemaphore);
00025 printf(" NumAckPackets\t\t\t%u\n", stats.RcvAckPackets );
00026
00027 printf(" []NumPacketsWantAck\t\t%u\n", stats.RcvNumPacketsWantAck );
00028
00029 printf("\n");
00030 printf("Send Task:\n");
00031 printf(" SocketErrors\t\t\t%u\n", stats.SntSocketErrors);
00032 printf(" EncodingErrors\t\t\t%u\n", stats.SntEncodingErrors);
00033 printf("\n");
00034
00035 printf(" QueueErrors\t\t\t%u\n", stats.SntQueueErrors);
00036 printf(" QueueFull\t\t\t%u\n", stats.SntQueueFull);
00037 printf("\n");
00038
00039 printf(" NumTelegrams\t\t\t%u\n", stats.SntTelegrams);
00040 printf(" NumRequests\t\t\t%u\n", stats.SntRequests);
00041 printf(" NumResponses\t\t\t%u\n", stats.SntResponses);
00042 printf(" NumAckPackets\t\t\t%u\n", stats.SntAckPackets);
00043
00044 printf("\n");
00045 printf("Blocking call results:\n");
00046 printf(" RequestsSent\t\t\t%u\n", stats.BlockingCallRequestSent);
00047 printf(" ResponseTimeout\t\t%u\n", stats.BlockingCallResponseTimeout);
00048 printf(" ResponseReceived\t\t%u\n", stats.BlockingCallResponseReceivedOnTime);
00049 printf("\n");
00050 printf(" AckTimeout\t\t\t%u\n", stats.AckTimeout);
00051
00052 printf("\n");
00053 UINT32 endtime = time(NULL);
00054 printf("Running for %u seconds\n",endtime - stats.StartTime.tv_sec);
00055 }
00056
00057 void SimulateLocalPeer(const char *aIPAddressOtherPeer, UINT16 aPortOtherPeer, UINT16 aPort)
00058 {
00059 static CNetworkQueue q;
00060
00061 bool ret;
00062 bool running = true;
00063 bool send_go = false;
00064
00065 NQ_TIMEVAL tmout;
00066 UINT32 ip_hostorder_peer;
00067 UINT16 port_hostorder_peer;
00068 const UINT32 arrlength=716;
00069 INT16 arr[arrlength];
00070
00071 tmout.tv_usec = 10*1000;
00072 tmout.tv_sec = 0;
00073
00074 platformInitializeConsoleInput();
00075 platformInitializeSocketSubsystem();
00076 printf("At the beginning of SimulateLocalPeer\n");
00077
00078 printf("Before Queue.create()\n");
00079 ret = q.create(aIPAddressOtherPeer, aPortOtherPeer, aPort);
00080 printf("Queue.create() returned: %d\n", ret);
00081 if (ret != true)
00082 {
00083 printf("Queue could not be initialized. Exiting.\n");
00084 platformDeinitializeConsoleInput();
00085 platformDeinitializeSocketSubsystem();
00086 return;
00087 }
00088
00089 ip_hostorder_peer = ntohl( platformIPAddrConvert( aIPAddressOtherPeer ) );
00090 port_hostorder_peer = aPortOtherPeer;
00091
00092
00093 UINT32 starttime = platformGetTickCount();
00094 UINT32 stoptime = platformGetTickCount();
00095 UINT32 nextTickCount = 0;
00096
00097 while( running )
00098 {
00099 if (platformkbhit())
00100 {
00101 char c = platformgetch();
00102 if (c == 'q' || c == 'Q')
00103 {
00104 printf("Received 'q' uit key. Exiting.\n");
00105 running = false;
00106 }
00107 if (c == 's' || c == 'S')
00108 {
00109 NQ_STATISTICS stats;
00110 q.getStatistics( stats );
00111 printStatistics( stats );
00112 }
00113
00114 if (c == 'g' || c == 'G')
00115 {
00116 if (send_go == false)
00117 {
00118 printf("Go!\n");
00119 send_go = true;
00120 starttime = platformGetTickCount();
00121 }
00122 else
00123 {
00124 printf("Stop!\n");
00125 send_go = false;
00126 stoptime = platformGetTickCount();
00127 }
00128 }
00129 }
00130
00131
00132
00133
00134
00135
00136 CNQMessage *tele;
00137
00138
00139 if (send_go == true)
00140 {
00141 tele = q.createMsg();
00142 if (tele != NULL)
00143 {
00144 NQ_TIMEVAL timeout;
00145
00146 tele->setType(NQ_PT_TELEGRAM, NQ_LT_UPDATE);
00147
00148 for (int t=0;t<(INT32)arrlength;t++)
00149 {
00150 arr[t] = -((t*2)+1);
00151 }
00152
00153 ret = tele->write(236, NQ_VARTYPE_INT16, arr, arrlength);
00154 if (ret == false) printf("Packet encoding failed.\n");
00155
00156 timeout.tv_sec = 0;
00157 timeout.tv_usec = 900*1000;
00158
00159 ret = q.sendMsgTo(tele, ip_hostorder_peer, port_hostorder_peer, timeout);
00160
00161 if (ret == false)
00162 {
00163 if (q.getLastError() != ERR_NQ_SENDQUEUEFULL) printf("Queue send error: %d\n", q.getLastError() );
00164 else platformSleepMilliseconds(20);
00165 }
00166
00167
00168 q.freeMsg( tele );
00169 }
00170
00171
00172
00173
00174 }
00175
00176 if ((send_go == true) && (platformGetTickCount() >= nextTickCount) )
00177 {
00178 CNQMessage *syncRequest;
00179 NQ_TIMEVAL syncTimeout;
00180 static UINT32 val=0;
00181 UINT32 valback;
00182
00183 syncRequest = q.createMsg();
00184
00185 if (syncRequest != NULL)
00186 {
00187 syncTimeout.tv_sec = 0;
00188 syncTimeout.tv_usec = 999*1000;
00189
00190 syncRequest->setType(NQ_PT_REQUEST, NQ_LT_CALL);
00191
00192 val++;
00193 if (val > 300) val = 1;
00194 ret = syncRequest->write_UINT32(1, val);
00195
00196
00197 if (ret == true)
00198 {
00199 CNQMessage *syncResponse;
00200
00201
00202
00203
00204 ret = q.sendRecvMsg(&syncResponse, syncRequest, ip_hostorder_peer, port_hostorder_peer, syncTimeout );
00205
00206
00207
00208
00209 if (ret == false)
00210 {
00211 printf("No response received (sync call) due to %d (Timeout is %d).\n", q.getLastError(), ERR_NQ_TIMEOUT);
00212 }
00213 else
00214 {
00215 UINT32 valback_expected = ((val*200)/44)%30;
00216
00217 ret = syncResponse->read_UINT32(1, valback);
00218 if ((ret != true) || (valback != valback_expected))
00219 {
00220 printf("Logical error in sync call response valb=%d, valb_expected=%d, val=%d\n",valback, valback_expected, val);
00221 }
00222
00223 q.freeMsg( syncResponse );
00224 }
00225 }
00226 else printf("sync Packet encoding failed.\n");
00227
00228 q.freeMsg( syncRequest );
00229 }
00230 else
00231 printf("Synchronous call failed: no message buffer free!\n");
00232
00233 nextTickCount = platformGetTickCount() + 1000;
00234 }
00235
00236
00237
00238 while (q.recvMsg(&tele) == true)
00239 {
00240
00241
00242 if (tele->isTelegram() == true)
00243 {
00244 if ((tele->getPhysicalType() == NQ_PT_TELEGRAM) &&
00245 (tele->getLogicalType() == NQ_LT_UPDATE))
00246 {
00247
00248 memset(arr, 0, sizeof(arr));
00249
00250 UINT32 arrlength_returned=arrlength;
00251
00252 ret = tele->read(236, NQ_VARTYPE_INT16, arr, arrlength_returned);
00253 if (ret == false) printf("Packet decoding failed.\n");
00254
00255 if (arrlength_returned != arrlength) printf("Returned arrlength differs!\n");
00256
00257 for (int t=0;t<(INT32)arrlength;t++)
00258 {
00259 if (arr[t] != (-((t*4)+1))) printf("Error in array at t=%d\n",t);
00260 }
00261
00262 }
00263 else printf("Received wrong (bad) telegram. This should not happen.\n");
00264 }
00265 else
00266 printf("Received wrong (bad) message. This may happen (timed out response).\n");
00267
00268 q.freeMsg( tele );
00269
00270 }
00271
00272 }
00273 if (send_go == true) stoptime = platformGetTickCount();
00274 printf("Out of while loop.\n");
00275
00276 ret = q.close();
00277 printf("Queue.close() local returned: %d\n", ret);
00278
00279 NQ_STATISTICS stats1;
00280 q.getStatistics( stats1 );
00281 printStatistics( stats1 );
00282
00283 printf("Data transfer was running for %u msec.\n", stoptime-starttime);
00284
00285 printf("At the end. Exiting.\n");
00286 platformDeinitializeConsoleInput();
00287 platformDeinitializeSocketSubsystem();
00288 }
00289
00290
00291 void SimulateRemotePeer(const char *aIPAddressOtherPeer, UINT16 aPortOtherPeer, UINT16 aPort)
00292 {
00293 static CNetworkQueue q;
00294
00295 bool ret;
00296 bool running = true;
00297
00298 NQ_TIMEVAL tmout;
00299 UINT32 ip_hostorder_peer;
00300 UINT16 port_hostorder_peer;
00301
00302 const UINT32 arrlength=716;
00303 INT16 arr[arrlength];
00304
00305 tmout.tv_usec = 10*1000;
00306 tmout.tv_sec = 0;
00307
00308 platformInitializeSocketSubsystem();
00309 platformInitializeConsoleInput();
00310 printf("At the beginning of SimulateRemotePeer\n");
00311
00312 printf("Before Queue.create()\n");
00313 ret = q.create(aIPAddressOtherPeer, aPortOtherPeer, aPort);
00314 printf("Queue.create() returned: %d\n", ret);
00315 if (ret != true)
00316 {
00317 printf("Queue could not be initialized. Exiting.\n");
00318 platformDeinitializeConsoleInput();
00319 platformDeinitializeSocketSubsystem();
00320 return;
00321 }
00322
00323 ip_hostorder_peer = ntohl( platformIPAddrConvert( aIPAddressOtherPeer ) );
00324 port_hostorder_peer = aPortOtherPeer;
00325
00326
00327 UINT32 starttime = platformGetTickCount();
00328
00329 while( running )
00330 {
00331 if (platformkbhit())
00332 {
00333 char c = platformgetch();
00334 if (c == 'q' || c == 'Q')
00335 {
00336 printf("Received 'q' uit key. Exiting.\n");
00337 running = false;
00338 }
00339 if (c == 'b' || c == 'B')
00340 {
00341 printf("removing remote peer %s:%d\n", aIPAddressOtherPeer, aPortOtherPeer);
00342 if (q.removePeer(ip_hostorder_peer, port_hostorder_peer) == false) printf("Error: Could NOT remove remote peer!\n");
00343 }
00344
00345 if (c == 'u' || c == 'U')
00346 {
00347 printf("registering remote peer %s:%d\n", aIPAddressOtherPeer, aPortOtherPeer);
00348 if (q.addPeer(ip_hostorder_peer, port_hostorder_peer) == false) printf("Error: Could NOT (re-)register remote peer!\n");
00349 }
00350
00351 if (c == 'i' || c == 'I')
00352 {
00353 UINT32 ip_hostorder_additionalpeer = ntohl( platformIPAddrConvert( "141.34.30.177" ) );
00354 UINT16 port_hostorder_additionalpeer = 31700;
00355
00356 printf("registering remote peer %s:%d\n", platformIPAddrConvert(ip_hostorder_additionalpeer), port_hostorder_additionalpeer);
00357 if (q.addPeer(ip_hostorder_additionalpeer, port_hostorder_additionalpeer) == false) printf("Error: Could NOT register additional peer!\n");
00358 }
00359
00360 if (c == 's' || c == 'S')
00361 {
00362 NQ_STATISTICS stats;
00363 q.getStatistics( stats );
00364 printStatistics( stats );
00365 }
00366
00367
00368 }
00369
00370
00371 CNQMessage *tele_remote;
00372 if (q.recvMsg(&tele_remote, tmout) == true)
00373 {
00374 if (tele_remote->isTelegram() == true)
00375 {
00376 if ((tele_remote->getPhysicalType() == NQ_PT_TELEGRAM) &&
00377 (tele_remote->getLogicalType() == NQ_LT_UPDATE))
00378 {
00379
00380 {
00381 UINT32 arrlength_returned=arrlength;
00382
00383 memset( arr, 0, sizeof(arr));
00384
00385 ret = tele_remote->read(236, NQ_VARTYPE_INT16, arr, arrlength_returned);
00386 if (ret == false) printf("Packet decoding failed.\n");
00387
00388 if (arrlength_returned != arrlength) printf("Returned arrlength differs!\n");
00389
00390 }
00391
00392 UINT32 ip;
00393 UINT16 port;
00394 bool gotIPAndPort;
00395 gotIPAndPort = tele_remote->getSenderIPandPort(ip, port);
00396
00397 {
00398 for (int t=0;t<(INT32)arrlength;t++)
00399 {
00400 if (arr[t] != (-((t*2)+1))) printf("Error in array at t=%d\n",t);
00401 }
00402 }
00403
00404 {
00405 for (int t=0;t<(INT32)arrlength;t++)
00406 {
00407 arr[t] = -((t*4)+1);
00408 }
00409
00410 tele_remote->clear();
00411 tele_remote->setType(NQ_PT_TELEGRAM, NQ_LT_UPDATE);
00412
00413 ret = tele_remote->write(236, NQ_VARTYPE_INT16, arr, arrlength);
00414 if (ret == false) printf("Packet encoding failed.\n");
00415
00416 }
00417
00418 if (gotIPAndPort == true)
00419 {
00420
00421 ret = q.sendMsgTo(tele_remote, ip, port);
00422 if (ret == false)
00423 {
00424 if (q.getLastError() != ERR_NQ_SENDQUEUEFULL) printf("Queue send error: %d\n", q.getLastError() );
00425 else platformSleepMilliseconds(20);
00426 }
00427 }
00428 else printf("Unable to get ip and port of remote peer.\n");
00429 }
00430 else printf("Received wrong (bad) telegram. This should not happen.\n");
00431 }
00432 else if (tele_remote->isRequest() == true)
00433 {
00434 UINT32 val;
00435
00436
00437 tele_remote->read_UINT32(1, val);
00438 val = ((val*200)/44)%30;
00439 tele_remote->write_UINT32(1, val);
00440 tele_remote->setType(NQ_PT_RESPONSE, tele_remote->getLogicalType());
00441
00442 UINT32 ip;
00443 UINT16 port;
00444 ret = tele_remote->getSenderIPandPort(ip, port);
00445 if (ret == true)
00446 {
00447
00448
00449 ret = q.sendMsgTo(tele_remote, ip, port);
00450 if (ret == false)
00451 {
00452 printf("Send sync Response: Queue send error: %d\n", q.getLastError() );
00453 }
00454 }
00455 else printf("Send sync Response: Could not check from where the message came.\n");
00456
00457 }
00458 else
00459 printf("Received wrong (bad) message. This should not happen.\n");
00460
00461
00462 q.freeMsg( tele_remote );
00463 }
00464
00465 }
00466 UINT32 stoptime = platformGetTickCount();
00467
00468 printf("Out of while loop.\n");
00469
00470 ret = q.close();
00471 printf("Queue.close() remote returned: %d\n", ret);
00472
00473 NQ_STATISTICS stats1;
00474 q.getStatistics( stats1 );
00475 printStatistics( stats1 );
00476
00477 printf("Data transfer was running for %u msec.\n", stoptime-starttime);
00478
00479 printf("At the end. Exiting.\n");
00480 platformDeinitializeConsoleInput();
00481 platformDeinitializeSocketSubsystem();
00482 }