@@ -326,36 +326,40 @@ public void run() {
326
326
notifyingPipes .remove (p );
327
327
}
328
328
}
329
- } else if ((now - p .lastPipeNotified ) * (2 + p .pipeSequence - p .notifySequence )
329
+ } else {
330
+ p .lastLiveDetected = System .currentTimeMillis ();
331
+ p .updateStatus (true );
332
+ if ((now - p .lastPipeNotified ) * (2 + p .pipeSequence - p .notifySequence )
330
333
< pipeLiveNotifyInterval + pipeLiveNotifyInterval ) {
331
- // do nothing
332
- } else if (r .pipeSequence != r .notifySequence ) {
333
- p .lastPipeNotified = now ;
334
- final HttpRequest request = getRequest ();
335
- final String pipeKey = p .pipeKey ;
336
- final long sequence = p .pipeSequence ;
337
- String pipeRequestData = constructRequest (pipeKey , PIPE_TYPE_NOTIFY , sequence );
338
- request .registerOnReadyStateChange (new XHRCallbackAdapter () {
339
- public void onLoaded () {
340
- String response = request .getResponseText ();
341
- if (response != null && p .notifySequence < sequence && response .indexOf ("$p1p3b$" ) != 0 ) {
342
- p .notifySequence = sequence ;
343
- }
344
- if (response != null && response .indexOf ("\" " + PIPE_STATUS_LOST + "\" " ) != -1 ) {
345
- p .pipeAlive = false ;
346
- p .pipeLost ();
347
- SimplePipeHelper .removePipe (pipeKey );
348
- // may need to inform user that connection is already lost!
349
- synchronized (notifyingMutex ) {
350
- notifyingPipes .remove (p );
334
+ // do nothing
335
+ } else if (r .pipeSequence != r .notifySequence ) {
336
+ p .lastPipeNotified = now ;
337
+ final HttpRequest request = getRequest ();
338
+ final String pipeKey = p .pipeKey ;
339
+ final long sequence = p .pipeSequence ;
340
+ String pipeRequestData = constructRequest (pipeKey , PIPE_TYPE_NOTIFY , sequence );
341
+ request .registerOnReadyStateChange (new XHRCallbackAdapter () {
342
+ public void onLoaded () {
343
+ String response = request .getResponseText ();
344
+ if (response != null && p .notifySequence < sequence && response .indexOf ("$p1p3b$" ) != 0 ) {
345
+ p .notifySequence = sequence ;
351
346
}
352
- } else {
353
- p .lastLiveDetected = System .currentTimeMillis ();
354
- p .updateStatus (true );
355
- }
356
- }
357
- });
358
- sendRequest (request , p .getPipeMethod (), p .getPipeURL (), pipeRequestData , pipes .length == 1 ? false : queueNotifying );
347
+ if (response != null && response .indexOf ("\" " + PIPE_STATUS_LOST + "\" " ) != -1 ) {
348
+ p .pipeAlive = false ;
349
+ p .pipeLost ();
350
+ SimplePipeHelper .removePipe (pipeKey );
351
+ // may need to inform user that connection is already lost!
352
+ synchronized (notifyingMutex ) {
353
+ notifyingPipes .remove (p );
354
+ }
355
+ } else {
356
+ p .lastLiveDetected = System .currentTimeMillis ();
357
+ p .updateStatus (true );
358
+ }
359
+ }
360
+ });
361
+ sendRequest (request , p .getPipeMethod (), p .getPipeURL (), pipeRequestData , pipes .length == 1 ? false : queueNotifying );
362
+ }
359
363
}
360
364
} // end of pipes for-loop
361
365
} // end of while true
@@ -1385,7 +1389,8 @@ public void run() {
1385
1389
if (last <= 0) {
1386
1390
last = created;
1387
1391
}
1388
- if ((runnable.queryEnded || (now - last >= spr.pipeLiveNotifyInterval
1392
+ if (runnable.isPipeLive() // may be false after a few queries
1393
+ && (runnable.queryEnded || (now - last >= spr.pipeLiveNotifyInterval
1389
1394
&& (lastXHR == -1 || now - lastXHR >= spr.pipeLiveNotifyInterval)))
1390
1395
&& runnable.queryFailedRetries < 3) {
1391
1396
runnable.queryEnded = false;
@@ -1398,7 +1403,12 @@ public void run() {
1398
1403
}
1399
1404
runnable.retries = runnable.queryFailedRetries;
1400
1405
runnable.received = runnable.lastPipeDataReceived;
1401
- if (runnable.queryFailedRetries >= 3
1406
+ if (!runnable.isPipeLive()) { // try to clean up for lost pipes
1407
+ runnable.pipeAlive = false;
1408
+ runnable.pipeClosed();
1409
+ sph.removePipe(key);
1410
+ spr.pipeIFrameClean (key);
1411
+ } else if (runnable.queryFailedRetries >= 3
1402
1412
|| now - last > 3 * spr.pipeLiveNotifyInterval) {
1403
1413
if (key == runnable.pipeKey) {
1404
1414
runnable.pipeAlive = false;
@@ -1419,7 +1429,7 @@ public void run() {
1419
1429
Thread queryThread = new Thread (new Runnable () {
1420
1430
public void run () {
1421
1431
SimplePipeRunnable runnable = null ;
1422
- while ((runnable = SimplePipeHelper .getPipe (key )) != null ) {
1432
+ while ((runnable = SimplePipeHelper .getPipe (key )) != null && runnable . isPipeLive () ) {
1423
1433
pipeQuery (runnable );
1424
1434
1425
1435
long now = System .currentTimeMillis ();
@@ -1443,6 +1453,12 @@ public void run() {
1443
1453
//e.printStackTrace();
1444
1454
}
1445
1455
}
1456
+ if (runnable != null ) { // runnable is still in SimplePipeHelper before #removePipe
1457
+ runnable .pipeAlive = false ;
1458
+ runnable .pipeClosed ();
1459
+ }
1460
+ SimplePipeHelper .removePipe (key );
1461
+
1446
1462
}
1447
1463
}, "Simple Pipe Query Worker" );
1448
1464
queryThread .setDaemon (true );
0 commit comments