Source for gnu.java.nio.SelectorImpl

   1: /* SelectorImpl.java --
   2:    Copyright (C) 2002, 2003, 2004, 2006  Free Software Foundation, Inc.
   3: 
   4: This file is part of GNU Classpath.
   5: 
   6: GNU Classpath is free software; you can redistribute it and/or modify
   7: it under the terms of the GNU General Public License as published by
   8: the Free Software Foundation; either version 2, or (at your option)
   9: any later version.
  10: 
  11: GNU Classpath is distributed in the hope that it will be useful, but
  12: WITHOUT ANY WARRANTY; without even the implied warranty of
  13: MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  14: General Public License for more details.
  15: 
  16: You should have received a copy of the GNU General Public License
  17: along with GNU Classpath; see the file COPYING.  If not, write to the
  18: Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
  19: 02110-1301 USA.
  20: 
  21: Linking this library statically or dynamically with other modules is
  22: making a combined work based on this library.  Thus, the terms and
  23: conditions of the GNU General Public License cover the whole
  24: combination.
  25: 
  26: As a special exception, the copyright holders of this library give you
  27: permission to link this library with independent modules to produce an
  28: executable, regardless of the license terms of these independent
  29: modules, and to copy and distribute the resulting executable under
  30: terms of your choice, provided that you also meet, for each linked
  31: independent module, the terms and conditions of the license of that
  32: module.  An independent module is a module which is not derived from
  33: or based on this library.  If you modify this library, you may extend
  34: this exception to your version of the library, but you are not
  35: obligated to do so.  If you do not wish to do so, delete this
  36: exception statement from your version. */
  37: 
  38: 
  39: package gnu.java.nio;
  40: 
  41: import java.io.IOException;
  42: import java.nio.channels.ClosedSelectorException;
  43: import java.nio.channels.SelectableChannel;
  44: import java.nio.channels.SelectionKey;
  45: import java.nio.channels.Selector;
  46: import java.nio.channels.SocketChannel;
  47: import java.nio.channels.spi.AbstractSelectableChannel;
  48: import java.nio.channels.spi.AbstractSelector;
  49: import java.nio.channels.spi.SelectorProvider;
  50: import java.util.Collections;
  51: import java.util.HashSet;
  52: import java.util.Iterator;
  53: import java.util.Set;
  54: 
  55: public class SelectorImpl extends AbstractSelector
  56: {
  57:   private Set<SelectionKey> keys;
  58:   private Set<SelectionKey> selected;
  59: 
  60:   /**
  61:    * A dummy object whose monitor regulates access to both our
  62:    * selectThread and unhandledWakeup fields.
  63:    */
  64:   private Object selectThreadMutex = new Object ();
  65: 
  66:   /**
  67:    * Any thread that's currently blocked in a select operation.
  68:    */
  69:   private Thread selectThread;
  70: 
  71:   /**
  72:    * Indicates whether we have an unhandled wakeup call. This can
  73:    * be due to either wakeup() triggering a thread interruption while
  74:    * a thread was blocked in a select operation (in which case we need
  75:    * to reset this thread's interrupt status after interrupting the
  76:    * select), or else that no thread was on a select operation at the
  77:    * time that wakeup() was called, in which case the following select()
  78:    * operation should return immediately with nothing selected.
  79:    */
  80:   private boolean unhandledWakeup;
  81: 
  82:   public SelectorImpl (SelectorProvider provider)
  83:   {
  84:     super (provider);
  85: 
  86:     keys = new HashSet<SelectionKey> ();
  87:     selected = new HashSet<SelectionKey> ();
  88:   }
  89: 
  90:   protected void finalize() throws Throwable
  91:   {
  92:     close();
  93:   }
  94: 
  95:   protected final void implCloseSelector()
  96:     throws IOException
  97:   {
  98:     // Cancel any pending select operation.
  99:     wakeup();
 100: 
 101:     synchronized (keys)
 102:       {
 103:         synchronized (selected)
 104:           {
 105:             synchronized (cancelledKeys ())
 106:               {
 107:                 // FIXME: Release resources here.
 108:               }
 109:           }
 110:       }
 111:   }
 112: 
 113:   public final Set<SelectionKey> keys()
 114:   {
 115:     if (!isOpen())
 116:       throw new ClosedSelectorException();
 117: 
 118:     return Collections.unmodifiableSet (keys);
 119:   }
 120: 
 121:   public final int selectNow()
 122:     throws IOException
 123:   {
 124:     // FIXME: We're simulating an immediate select
 125:     // via a select with a timeout of one millisecond.
 126:     return select (1);
 127:   }
 128: 
 129:   public final int select()
 130:     throws IOException
 131:   {
 132:     return select (0);
 133:   }
 134: 
 135:   private final int[] getFDsAsArray (int ops)
 136:   {
 137:     int[] result;
 138:     int counter = 0;
 139:     Iterator<SelectionKey> it = keys.iterator ();
 140: 
 141:     // Count the number of file descriptors needed
 142:     while (it.hasNext ())
 143:       {
 144:         SelectionKeyImpl key = (SelectionKeyImpl) it.next ();
 145: 
 146:         if ((key.interestOps () & ops) != 0)
 147:           {
 148:             counter++;
 149:           }
 150:       }
 151: 
 152:     result = new int[counter];
 153: 
 154:     counter = 0;
 155:     it = keys.iterator ();
 156: 
 157:     // Fill the array with the file descriptors
 158:     while (it.hasNext ())
 159:       {
 160:         SelectionKeyImpl key = (SelectionKeyImpl) it.next ();
 161: 
 162:         if ((key.interestOps () & ops) != 0)
 163:           {
 164:             result[counter] = key.getNativeFD();
 165:             counter++;
 166:           }
 167:       }
 168: 
 169:     return result;
 170:   }
 171: 
 172:   public synchronized int select (long timeout)
 173:     throws IOException
 174:   {
 175:     if (!isOpen())
 176:       throw new ClosedSelectorException();
 177: 
 178:     synchronized (keys)
 179:       {
 180:         synchronized (selected)
 181:           {
 182:             deregisterCancelledKeys();
 183: 
 184:             // Set only keys with the needed interest ops into the arrays.
 185:             int[] read = getFDsAsArray (SelectionKey.OP_READ
 186:                                         | SelectionKey.OP_ACCEPT);
 187:             int[] write = getFDsAsArray (SelectionKey.OP_WRITE
 188:                                          | SelectionKey.OP_CONNECT);
 189: 
 190:             // FIXME: We dont need to check this yet
 191:             int[] except = new int [0];
 192: 
 193:             // Test to see if we've got an unhandled wakeup call,
 194:             // in which case we return immediately. Otherwise,
 195:             // remember our current thread and jump into the select.
 196:             // The monitor for dummy object selectThreadMutex regulates
 197:             // access to these fields.
 198: 
 199:             // FIXME: Not sure from the spec at what point we should
 200:             // return "immediately". Is it here or immediately upon
 201:             // entry to this function?
 202: 
 203:             // NOTE: There's a possibility of another thread calling
 204:             // wakeup() immediately after our thread releases
 205:             // selectThreadMutex's monitor here, in which case we'll
 206:             // do the select anyway. Since calls to wakeup() and select()
 207:             // among different threads happen in non-deterministic order,
 208:             // I don't think this is an issue.
 209:             synchronized (selectThreadMutex)
 210:               {
 211:                 if (unhandledWakeup)
 212:                   {
 213:                     unhandledWakeup = false;
 214:                     return 0;
 215:                   }
 216:                 else
 217:                   {
 218:                     selectThread = Thread.currentThread ();
 219:                   }
 220:               }
 221: 
 222:             // Call the native select() on all file descriptors.
 223:             int result = 0;
 224:             try
 225:               {
 226:                 begin();
 227:                 result = VMSelector.select (read, write, except, timeout);
 228:               }
 229:             finally
 230:               {
 231:                 end();
 232:               }
 233: 
 234:             // If our unhandled wakeup flag is set at this point,
 235:             // reset our thread's interrupt flag because we were
 236:             // awakened by wakeup() instead of an external thread
 237:             // interruption.
 238:             //
 239:             // NOTE: If we were blocked in a select() and one thread
 240:             // called Thread.interrupt() on the blocked thread followed
 241:             // by another thread calling Selector.wakeup(), then race
 242:             // conditions could make it so that the thread's interrupt
 243:             // flag is reset even though the Thread.interrupt() call
 244:             // "was there first". I don't think we need to care about
 245:             // this scenario.
 246:             synchronized (selectThreadMutex)
 247:               {
 248:                 if (unhandledWakeup)
 249:                   {
 250:                     unhandledWakeup = false;
 251:                     Thread.interrupted ();
 252:                   }
 253:                 selectThread = null;
 254:               }
 255: 
 256:             Iterator<SelectionKey> it = keys.iterator ();
 257: 
 258:             while (it.hasNext ())
 259:               {
 260:                 int ops = 0;
 261:                 SelectionKeyImpl key = (SelectionKeyImpl) it.next ();
 262: 
 263:                 // If key is already selected retrieve old ready ops.
 264:                 if (selected.contains (key))
 265:                   {
 266:                     ops = key.readyOps ();
 267:                   }
 268: 
 269:                 // Set new ready read/accept ops
 270:                 for (int i = 0; i < read.length; i++)
 271:                   {
 272:                     if (key.getNativeFD() == read[i])
 273:                       {
 274:                         if (key.channel () instanceof ServerSocketChannelImpl)
 275:                           {
 276:                             ops = ops | SelectionKey.OP_ACCEPT;
 277:                           }
 278:                         else
 279:                           {
 280:                             ops = ops | SelectionKey.OP_READ;
 281:                           }
 282:                       }
 283:                   }
 284: 
 285:                 // Set new ready write ops
 286:                 for (int i = 0; i < write.length; i++)
 287:                   {
 288:                     if (key.getNativeFD() == write[i])
 289:                       {
 290:                         if (key.channel() instanceof SocketChannel)
 291:                           {
 292:                             if (((SocketChannel) key.channel ()).isConnected ())
 293:                               ops = ops | SelectionKey.OP_WRITE;
 294:                             else
 295:                               ops = ops | SelectionKey.OP_CONNECT;
 296:                           }
 297:                         else
 298:                           ops = ops | SelectionKey.OP_WRITE;
 299:                       }
 300:                   }
 301: 
 302:                 // FIXME: We dont handle exceptional file descriptors yet.
 303: 
 304:                 // If key is not yet selected add it.
 305:                 if (!selected.contains (key))
 306:                   {
 307:                     selected.add (key);
 308:                   }
 309: 
 310:                 // Set new ready ops
 311:                 key.readyOps (key.interestOps () & ops);
 312:               }
 313:             deregisterCancelledKeys();
 314: 
 315:             return result;
 316:           }
 317:         }
 318:   }
 319: 
 320:   public final Set<SelectionKey> selectedKeys()
 321:   {
 322:     if (!isOpen())
 323:       throw new ClosedSelectorException();
 324: 
 325:     return selected;
 326:   }
 327: 
 328:   public final Selector wakeup()
 329:   {
 330:     // IMPLEMENTATION NOTE: Whereas the specification says that
 331:     // thread interruption should trigger a call to wakeup, we
 332:     // do the reverse under the covers: wakeup triggers a thread
 333:     // interrupt followed by a subsequent reset of the thread's
 334:     // interrupt status within select().
 335: 
 336:     // First, acquire the monitor of the object regulating
 337:     // access to our selectThread and unhandledWakeup fields.
 338:     synchronized (selectThreadMutex)
 339:       {
 340:         unhandledWakeup = true;
 341: 
 342:         // Interrupt any thread which is currently blocked in
 343:         // a select operation.
 344:         if (selectThread != null)
 345:           selectThread.interrupt ();
 346:       }
 347: 
 348:     return this;
 349:   }
 350: 
 351:   private final void deregisterCancelledKeys()
 352:   {
 353:     Set<SelectionKey> ckeys = cancelledKeys ();
 354:     synchronized (ckeys)
 355:     {
 356:       Iterator<SelectionKey> it = ckeys.iterator();
 357: 
 358:       while (it.hasNext ())
 359:         {
 360:           keys.remove ((SelectionKeyImpl) it.next ());
 361:           it.remove ();
 362:         }
 363:     }
 364:   }
 365: 
 366:   protected SelectionKey register (SelectableChannel ch, int ops, Object att)
 367:   {
 368:     return register ((AbstractSelectableChannel) ch, ops, att);
 369:   }
 370: 
 371:   protected final SelectionKey register (AbstractSelectableChannel ch, int ops,
 372:                                          Object att)
 373:   {
 374:     SelectionKeyImpl result;
 375: 
 376:     if (ch instanceof SocketChannelImpl)
 377:       result = new SocketChannelSelectionKey (ch, this);
 378:     else if (ch instanceof DatagramChannelImpl)
 379:       result = new DatagramChannelSelectionKey (ch, this);
 380:     else if (ch instanceof ServerSocketChannelImpl)
 381:       result = new ServerSocketChannelSelectionKey (ch, this);
 382:     else if (ch instanceof gnu.java.nio.SocketChannelImpl)
 383:       result = new gnu.java.nio.SocketChannelSelectionKeyImpl((gnu.java.nio.SocketChannelImpl)ch, this);
 384:     else
 385:       throw new InternalError ("No known channel type");
 386: 
 387:     synchronized (keys)
 388:       {
 389:         keys.add (result);
 390: 
 391:         result.interestOps (ops);
 392:         result.attach (att);
 393:       }
 394: 
 395:     return result;
 396:   }
 397: }