-
Notifications
You must be signed in to change notification settings - Fork 20
Expand file tree
/
Copy pathStreamDemultiplexor.java
More file actions
973 lines (818 loc) · 23.4 KB
/
StreamDemultiplexor.java
File metadata and controls
973 lines (818 loc) · 23.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
/*
* @(#)StreamDemultiplexor.java 0.3-3 06/05/2001
*
* This file is part of the HTTPClient package
* Copyright (C) 1996-2001 Ronald Tschal�r
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free
* Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
* MA 02111-1307, USA
*
* For questions, suggestions, bug-reports, enhancement-requests etc.
* I may be contacted at:
*
* ronald@innovation.ch
*
* The HTTPClient's home page is located at:
*
* http://www.innovation.ch/java/HTTPClient/
*
*/
package HTTPClient;
import java.io.IOException;
import java.io.EOFException;
import java.io.InterruptedIOException;
import java.net.Socket;
import java.net.SocketException;
/**
* This class handles the demultiplexing of input stream. This is needed
* for things like keep-alive in HTTP/1.0, persist in HTTP/1.1 and in HTTP-NG.
*
* @version 0.3-3 06/05/2001
* @author Ronald Tschal�r
*/
public class StreamDemultiplexor implements GlobalConstants
{
/** the protocol were handling request for */
private int Protocol;
/** the connection we're working for */
private HTTPConnection Connection;
/** the input stream to demultiplex */
private BufferedInputStream Stream;
/** the socket this hangs off */
private Socket Sock = null;
/** signals after the closing of which stream to close the socket */
private ResponseHandler MarkedForClose;
/** timer used to close the socket if unused for a given time */
private SocketTimeout.TimeoutEntry Timer = null;
/** timer thread which implements the timers */
private static SocketTimeout TimerThread = null;
/** cleanup object to stop timer thread when we're gc'd */
private static Object cleanup;
/** a Vector to hold the list of response handlers were serving */
private LinkedList RespHandlerList;
/** number of unread bytes in current chunk (if transf-enc == chunked) */
private long chunk_len;
/** the currently set timeout for the socket */
private int cur_timeout = 0;
static
{
TimerThread = new SocketTimeout(60);
TimerThread.start();
/* This is here to clean up the timer thread should the
* StreamDemultiplexor class be gc'd. This will not usually happen,
* unless the stuff is being run in an Applet or similar environment
* where multiple classloaders are used to load the same class
* multiple times. However, even in those environments it's not clear
* that this here will do us any good, because classes aren't usually
* gc'd unless their classloader is, but the timer thread keeps a
* reference to the classloader, and hence ought to prevent the
* classloader from being gc'd.
*/
cleanup = new Object() {
private final SocketTimeout timer = StreamDemultiplexor.TimerThread;
protected void finalize()
{
timer.kill();
}
};
}
// Constructors
/**
* a simple contructor.
*
* @param protocol the protocol used on this stream.
* @param sock the socket which we're to demux.
* @param connection the http-connection this socket belongs to.
*/
StreamDemultiplexor(int protocol, Socket sock, HTTPConnection connection)
throws IOException
{
this.Protocol = protocol;
this.Connection = connection;
RespHandlerList = new LinkedList();
init(sock);
}
/**
* Initializes the demultiplexor with a new socket.
*
* @param stream the stream to demultiplex
*/
private void init(Socket sock) throws IOException
{
Log.write(Log.DEMUX, "Demux: Initializing Stream Demultiplexor (" +
this.hashCode() + ")");
this.Sock = sock;
this.Stream = new BufferedInputStream(sock.getInputStream());
MarkedForClose = null;
chunk_len = -1;
// create a timer to close the socket after 60 seconds, but don't
// start it yet
Timer = TimerThread.setTimeout(this);
Timer.hyber();
}
// Methods
/**
* Each Response must register with us.
*/
void register(Response resp_handler, Request req) throws RetryException
{
synchronized (RespHandlerList)
{
if (Sock == null)
throw new RetryException();
RespHandlerList.addToEnd(
new ResponseHandler(resp_handler, req, this));
}
}
/**
* creates an input stream for the response.
*
* @param resp the response structure requesting the stream
* @return an InputStream
*/
RespInputStream getStream(Response resp)
{
ResponseHandler resph;
synchronized (RespHandlerList)
{
for (resph = (ResponseHandler) RespHandlerList.enumerate();
resph != null;
resph = (ResponseHandler) RespHandlerList.next())
{
if (resph.resp == resp) break;
}
}
if (resph != null)
return resph.stream;
else
return null;
}
/**
* Restarts the timer thread that will close an unused socket after
* 60 seconds.
*/
void restartTimer()
{
if (Timer != null) Timer.reset();
}
/**
* reads an array of bytes from the master stream.
*/
int read(byte[] b, int off, int len, ResponseHandler resph, int timeout)
throws IOException
{
if (resph.exception != null)
{
resph.exception.fillInStackTrace();
throw resph.exception;
}
if (resph.eof)
return -1;
// read the headers and data for all responses preceding us.
ResponseHandler head;
while ((head = (ResponseHandler) RespHandlerList.getFirst()) != null &&
head != resph)
{
try
{ head.stream.readAll(timeout); }
catch (IOException ioe)
{
if (ioe instanceof InterruptedIOException)
throw ioe;
else
{
resph.exception.fillInStackTrace();
throw resph.exception;
}
}
}
// Now we can read from the stream.
synchronized (this)
{
if (resph.exception != null)
{
resph.exception.fillInStackTrace();
throw resph.exception;
}
if (resph.resp.cd_type != CD_HDRS)
Log.write(Log.DEMUX, "Demux: Reading for stream " +
resph.stream.hashCode());
if (Timer != null) Timer.hyber();
try
{
int rcvd = -1;
if (timeout != cur_timeout)
{
Log.write(Log.DEMUX, "Demux: Setting timeout to " +
timeout + " ms");
Sock.setSoTimeout(timeout);
cur_timeout = timeout;
}
switch (resph.resp.cd_type)
{
case CD_HDRS:
rcvd = Stream.read(b, off, len);
if (rcvd == -1)
throw new EOFException("Premature EOF encountered");
break;
case CD_0:
rcvd = -1;
close(resph);
break;
case CD_CLOSE:
rcvd = Stream.read(b, off, len);
if (rcvd == -1)
close(resph);
break;
case CD_CONTLEN:
int cl = resph.resp.ContentLength;
if (len > cl - resph.stream.count)
len = cl - resph.stream.count;
rcvd = Stream.read(b, off, len);
if (rcvd == -1)
throw new EOFException("Premature EOF encountered");
if (resph.stream.count+rcvd == cl)
close(resph);
break;
case CD_CHUNKED:
if (chunk_len == -1) // it's a new chunk
chunk_len = Codecs.getChunkLength(Stream);
if (chunk_len > 0) // it's data
{
if (len > chunk_len) len = (int) chunk_len;
rcvd = Stream.read(b, off, len);
if (rcvd == -1)
throw new EOFException("Premature EOF encountered");
chunk_len -= rcvd;
if (chunk_len == 0) // got the whole chunk
{
Stream.read(); // CR
Stream.read(); // LF
chunk_len = -1;
}
}
else // the footers (trailers)
{
resph.resp.readTrailers(Stream);
rcvd = -1;
close(resph);
chunk_len = -1;
}
break;
case CD_MP_BR:
byte[] endbndry = resph.getEndBoundary(Stream);
int[] end_cmp = resph.getEndCompiled(Stream);
rcvd = Stream.read(b, off, len);
if (rcvd == -1)
throw new EOFException("Premature EOF encountered");
int ovf = Stream.pastEnd(endbndry, end_cmp);
if (ovf != -1)
{
rcvd -= ovf;
close(resph);
}
break;
default:
throw new Error("Internal Error in StreamDemultiplexor: " +
"Invalid cd_type " + resph.resp.cd_type);
}
restartTimer();
return rcvd;
}
catch (InterruptedIOException ie) // don't intercept this one
{
restartTimer();
throw ie;
}
catch (IOException ioe)
{
Log.write(Log.DEMUX, "Demux: ", ioe);
close(ioe, true);
throw resph.exception; // set by retry_requests
}
catch (ParseException pe)
{
Log.write(Log.DEMUX, "Demux: ", pe);
close(new IOException(pe.toString()), true);
throw resph.exception; // set by retry_requests
}
}
}
/**
* skips a number of bytes in the master stream. This is done via a
* dummy read, as the socket input stream doesn't like skip()'s.
*/
synchronized long skip(long num, ResponseHandler resph) throws IOException
{
if (resph.exception != null)
{
resph.exception.fillInStackTrace();
throw resph.exception;
}
if (resph.eof)
return 0;
byte[] dummy = new byte[(int) num];
int rcvd = read(dummy, 0, (int) num, resph, 0);
if (rcvd == -1)
return 0;
else
return rcvd;
}
/**
* Determines the number of available bytes. If <var>resph</var> is null, return
* available bytes on the socket stream itself (used by HTTPConnection).
*/
synchronized int available(ResponseHandler resph) throws IOException
{
if (resph != null && resph.exception != null)
{
resph.exception.fillInStackTrace();
throw resph.exception;
}
if (resph != null && resph.eof)
return 0;
int avail = Stream.available();
if (resph == null)
return avail;
switch (resph.resp.cd_type)
{
case CD_0:
return 0;
case CD_HDRS:
// this is something of a hack; I could return 0, but then
// if you were waiting for something on a response that
// wasn't first in line (and you didn't try to read the
// other response) you'd wait forever. On the other hand,
// we might be making a false promise here...
return (avail > 0 ? 1 : 0);
case CD_CLOSE:
return avail;
case CD_CONTLEN:
int cl = resph.resp.ContentLength;
cl -= resph.stream.count;
return (avail < cl ? avail : cl);
case CD_CHUNKED:
return avail; // not perfect...
case CD_MP_BR:
return avail; // not perfect...
default:
throw new Error("Internal Error in StreamDemultiplexor: " +
"Invalid cd_type " + resph.resp.cd_type);
}
}
/**
* Closes the socket and all associated streams. If <var>exception</var>
* is not null then all active requests are retried.
*
* <P>There are five ways this method may be activated. 1) if an exception
* occurs during read or write. 2) if the stream is marked for close but
* no responses are outstanding (e.g. due to a timeout). 3) when the
* markedForClose response is closed. 4) if all response streams up until
* and including the markedForClose response have been closed. 5) if this
* demux is finalized.
*
* @param exception the IOException to be sent to the streams.
* @param was_reset if true then the exception is due to a connection
* reset; otherwise it means we generated the exception
* ourselves and this is a "normal" close.
*/
synchronized void close(IOException exception, boolean was_reset)
{
if (Sock == null) // already cleaned up
return;
Log.write(Log.DEMUX, "Demux: Closing all streams and socket (" +
this.hashCode() + ")");
try
{ Stream.close(); }
catch (IOException ioe) { }
try
{ Sock.close(); }
catch (IOException ioe) { }
Sock = null;
if (Timer != null)
{
Timer.kill();
Timer = null;
}
Connection.DemuxList.remove(this);
// Here comes the tricky part: redo outstanding requests!
if (exception != null)
synchronized (RespHandlerList)
{ retry_requests(exception, was_reset); }
}
/**
* Retries outstanding requests. Well, actually the RetryModule does
* that. Here we just throw a RetryException for each request so that
* the RetryModule can catch and handle them.
*
* @param exception the exception that led to this call.
* @param was_reset this flag is passed to the RetryException and is
* used by the RetryModule to distinguish abnormal closes
* from expected closes.
*/
private void retry_requests(IOException exception, boolean was_reset)
{
RetryException first = null,
prev = null;
ResponseHandler resph = (ResponseHandler) RespHandlerList.enumerate();
while (resph != null)
{
/* if the application is already reading the data then the
* response has already been handled. In this case we must
* throw the real exception.
*/
if (resph.resp.got_headers)
{
resph.exception = exception;
}
else
{
RetryException tmp = new RetryException(exception.getMessage());
if (first == null) first = tmp;
tmp.request = resph.request;
tmp.response = resph.resp;
tmp.exception = exception;
tmp.conn_reset = was_reset;
tmp.first = first;
tmp.addToListAfter(prev);
prev = tmp;
resph.exception = tmp;
}
RespHandlerList.remove(resph);
resph = (ResponseHandler) RespHandlerList.next();
}
}
/**
* Closes the associated stream. If this one has been markedForClose then
* the socket is closed; else closeSocketIfAllStreamsClosed is invoked.
*/
private void close(ResponseHandler resph)
{
synchronized (RespHandlerList)
{
if (resph != (ResponseHandler) RespHandlerList.getFirst())
return;
Log.write(Log.DEMUX, "Demux: Closing stream " +
resph.stream.hashCode());
resph.eof = true;
RespHandlerList.remove(resph);
}
if (resph == MarkedForClose)
close(new IOException("Premature end of Keep-Alive"), false);
else
closeSocketIfAllStreamsClosed();
}
/**
* Close the socket if all the streams have been closed.
*
* <P>When a stream reaches eof it is removed from the response handler
* list, but when somebody close()'s the response stream it is just
* marked as such. This means that all responses in the list have either
* not been read at all or only partially read, but they might have been
* close()'d meaning that nobody is interested in the data. So If all the
* response streams up till and including the one markedForClose have
* been close()'d then we can remove them from our list and close the
* socket.
*
* <P>Note: if the response list is emtpy or if no response is
* markedForClose then this method does nothing. Specifically it does
* not close the socket. We only want to close the socket if we've been
* told to do so.
*
* <P>Also note that there might still be responses in the list after
* the markedForClose one. These are due to us having pipelined more
* requests to the server than it's willing to serve on a single
* connection. These requests will be retried if possible.
*/
synchronized void closeSocketIfAllStreamsClosed()
{
synchronized (RespHandlerList)
{
ResponseHandler resph = (ResponseHandler) RespHandlerList.enumerate();
while (resph != null && resph.stream.closed)
{
if (resph == MarkedForClose)
{
// remove all response handlers first
ResponseHandler tmp;
do
{
tmp = (ResponseHandler) RespHandlerList.getFirst();
RespHandlerList.remove(tmp);
}
while (tmp != resph);
// close the socket
close(new IOException("Premature end of Keep-Alive"), false);
return;
}
resph = (ResponseHandler) RespHandlerList.next();
}
}
}
/**
* returns the socket associated with this demux
*/
synchronized Socket getSocket()
{
if (MarkedForClose != null)
return null;
if (Timer != null) Timer.hyber();
return Sock;
}
/**
* Mark this demux to not accept any more request and to close the
* stream after this <var>resp</var>onse or all requests have been
* processed, or close immediately if no requests are registered.
*
* @param response the Response after which the connection should
* be closed.
*/
synchronized void markForClose(Response resp)
{
synchronized (RespHandlerList)
{
if (RespHandlerList.getFirst() == null) // no active request,
{ // so close the socket
close(new IOException("Premature end of Keep-Alive"), false);
return;
}
if (Timer != null)
{
Timer.kill();
Timer = null;
}
ResponseHandler resph, lasth = null;
for (resph = (ResponseHandler) RespHandlerList.enumerate();
resph != null;
resph = (ResponseHandler) RespHandlerList.next())
{
if (resph.resp == resp) // new resp precedes any others
{
MarkedForClose = resph;
Log.write(Log.DEMUX, "Demux: stream " +
resp.inp_stream.hashCode() +
" marked for close");
closeSocketIfAllStreamsClosed();
return;
}
if (MarkedForClose == resph)
return; // already marked for closing after an earlier resp
lasth = resph;
}
if (lasth == null)
return;
MarkedForClose = lasth; // resp == null, so use last resph
closeSocketIfAllStreamsClosed();
Log.write(Log.DEMUX, "Demux: stream " + lasth.stream.hashCode() +
" marked for close");
}
}
/**
* Emergency stop. Closes the socket and notifies the responses that
* the requests are aborted.
*
* @since V0.3
*/
void abort()
{
Log.write(Log.DEMUX, "Demux: Aborting socket (" + this.hashCode() + ")");
// notify all responses of abort
synchronized (RespHandlerList)
{
for (ResponseHandler resph =
(ResponseHandler) RespHandlerList.enumerate();
resph != null;
resph = (ResponseHandler) RespHandlerList.next())
{
if (resph.resp.http_resp != null)
resph.resp.http_resp.markAborted();
if (resph.exception == null)
resph.exception = new IOException("Request aborted by user");
}
/* Close the socket.
* Note: this duplicates most of close(IOException, boolean). We
* do *not* call close() because that is synchronized, but we want
* abort() to be asynch.
*/
if (Sock != null)
{
try
{
try
{ Sock.setSoLinger(false, 0); }
catch (SocketException se)
{ }
try
{ Stream.close(); }
catch (IOException ioe) { }
try
{ Sock.close(); }
catch (IOException ioe) { }
Sock = null;
if (Timer != null)
{
Timer.kill();
Timer = null;
}
}
catch (NullPointerException npe)
{ }
Connection.DemuxList.remove(this);
}
}
}
/**
* A safety net to close the connection.
*/
protected void finalize() throws Throwable
{
close((IOException) null, false);
super.finalize();
}
/**
* produces a string.
* @return a string containing the class name and protocol number
*/
public String toString()
{
String prot;
switch (Protocol)
{
case HTTP:
prot = "HTTP"; break;
case HTTPS:
prot = "HTTPS"; break;
case SHTTP:
prot = "SHTTP"; break;
case HTTP_NG:
prot = "HTTP_NG"; break;
default:
throw new Error("HTTPClient Internal Error: invalid protocol " +
Protocol);
}
return getClass().getName() + "[Protocol=" + prot + "]";
}
public static void cleanup()
{
TimerThread.kill();
}
}
/**
* This thread is used to reap idle connections. It is NOT used to timeout
* reads or writes on a socket. It keeps a list of timer entries and expires
* them after a given time.
*/
class SocketTimeout extends Thread
{
protected boolean alive = true;
/**
* This class represents a timer entry. It is used to close an
* inactive socket after n seconds. Once running, the timer may be
* suspended (hyber()), restarted (reset()), or aborted (kill()).
* When the timer expires it invokes markForClose() on the
* associated stream demultipexer.
*/
class TimeoutEntry
{
boolean restart = false,
hyber = false,
alive = true;
StreamDemultiplexor demux;
TimeoutEntry next = null,
prev = null;
TimeoutEntry(StreamDemultiplexor demux)
{
this.demux = demux;
}
void reset()
{
hyber = false;
if (restart) return;
restart = true;
synchronized (time_list)
{
if (!alive) return;
// remove from current position
next.prev = prev;
prev.next = next;
// and add to end of timeout list
next = time_list[current];
prev = time_list[current].prev;
prev.next = this;
next.prev = this;
}
}
void hyber()
{
if (alive) hyber = true;
}
void kill()
{
alive = false;
restart = false;
hyber = false;
synchronized (time_list)
{
if (prev == null) return;
next.prev = prev;
prev.next = next;
prev = null;
}
}
}
TimeoutEntry[] time_list; // jdk 1.1.x javac bug: these must not
int current; // be private!
SocketTimeout(int secs)
{
super("SocketTimeout");
try { setDaemon(true); }
catch (SecurityException se) { } // Oh well...
setPriority(MAX_PRIORITY);
time_list = new TimeoutEntry[secs];
for (int idx=0; idx<secs; idx++)
{
time_list[idx] = new TimeoutEntry(null);
time_list[idx].next = time_list[idx].prev = time_list[idx];
}
current = 0;
}
public TimeoutEntry setTimeout(StreamDemultiplexor demux)
{
TimeoutEntry entry = new TimeoutEntry(demux);
synchronized (time_list)
{
entry.next = time_list[current];
entry.prev = time_list[current].prev;
entry.prev.next = entry;
entry.next.prev = entry;
}
return entry;
}
/**
* This timer is implemented by sleeping for 1 second and then
* checking the timer list.
*/
public void run()
{
TimeoutEntry marked = null;
while (alive)
{
try { sleep(1000L); } catch (InterruptedException ie) { }
synchronized (time_list)
{
// reset all restart flags
for (TimeoutEntry entry = time_list[current].next;
entry != time_list[current];
entry = entry.next)
{
entry.restart = false;
}
current++;
if (current >= time_list.length)
current = 0;
// remove all expired timers
for (TimeoutEntry entry = time_list[current].next;
entry != time_list[current];
entry = entry.next)
{
if (entry.alive && !entry.hyber)
{
TimeoutEntry prev = entry.prev;
entry.kill();
/* put on death row. Note: we must not invoke
* markForClose() here because it is synch'd
* and can therefore lead to a deadlock if that
* thread is trying to do a reset() or kill()
*/
entry.next = marked;
marked = entry;
entry = prev;
}
}
}
while (marked != null)
{
marked.demux.markForClose(null);
marked = marked.next;
}
}
}
/**
* Stop the timer thread.
*/
public void kill() {
alive = false;
}
}