Thursday, May 26, 2011

RVM Thread Pool

package org.jikesrvm.compilers.opt;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;


import org.jikesrvm.scheduler.SystemThread;
import org.vmmagic.pragma.NonMoving;

@NonMoving
public class OptCompilerThread
{
    private final int nThreads;
    private final PoolWorker[] threads;
    private final OptCompilerBlockingQueue queue;
    public OptCompilerThread(int nThreads)
    {
        this.nThreads = nThreads;
        queue = new OptCompilerBlockingQueue();
        threads = new PoolWorker[nThreads];

        for (int i=0; i<nThreads; i++) {
            threads[i] = new PoolWorker(i);
            threads[i].start();
        }
    }

  
   

    public void execute(Runnable r) throws InterruptedException {
        synchronized(queue) {
          
           
            queue.add(r);
            queue.notify();
        }
    }
   
   
    public synchronized void shutdown(){
        for(int i=0; i<nThreads; i++){
            threads[i].stop(new IllegalStateException("ThreadPool is stopped"));
           
        }
      }
   

    @NonMoving
    private class PoolWorker extends SystemThread {
    private final ArrayBlockingQueue<Runnable> handoffBox = new ArrayBlockingQueue<Runnable>(1);

    
       
       
        protected PoolWorker(int ThreadCountN) {
            super("CompilerThread" + ThreadCountN);
        }

        @Override
        public void run() {
            Runnable r;
   
            while (true) {
                synchronized(queue) {
                    while (queue.isEmpty()) {
                        try
                        {
                            queue.wait();
                           
                        }
                        catch (InterruptedException ignored)
                        {
                           
                        }
                    }

                    try {
                        r = (Runnable) queue.remove();
                        handoffBox.add(r);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
               
                synchronized(handoffBox)
                {
                    while(handoffBox.isEmpty()){
                        try
                        {
       
                            handoffBox.wait();
                           
                        }
                        catch (InterruptedException ignored)
                        {
                        }
                    }
                    r = (Runnable) handoffBox.remove();
                   
                   
                }

                // If we don't catch RuntimeException,
                // the pool could leak threads
                try {
                    r.run();
                }
                catch (RuntimeException e) {
                    // You might want to log something here
                }
               
             
            
               
       
               
            }
        }
    }
}




class OptCompilerBlockingQueue {

      private List<Object> queue = new LinkedList<Object>();
     
      private int  limit;
      private boolean Dynamic = false;
     

      public OptCompilerBlockingQueue(int limit){
        this.limit = limit;
      }
     
      public synchronized boolean isEmpty()
        {
            if (queue.isEmpty())
                return true;
            else
                return false;   
        }

    public OptCompilerBlockingQueue()
      {
          Dynamic = true;
      }


      public synchronized void add(Object item)
      throws InterruptedException  {
        if(Dynamic == false)
        {
            while(queue.size() == limit) {
                wait();
            }
        }
          queue.add(item);
      }


      public synchronized Object remove()
      throws InterruptedException{
          while(queue.size() == 0){
              wait();
              }
        return  queue.remove(0);
      }
}
------------------------------------------------------------------------------------------------

package org.jikesrvm.compilers.opt;


import org.jikesrvm.compilers.opt.ir.Register;
import org.vmmagic.pragma.NonMoving;

@NonMoving
public class ParallelOptCompiler implements ParallelOptCompilerPass {
   

   
    public Runnable runPassOnSingleRegister(final Register reg) {
        return new Runnable(){

            public void run() {
                reg.putSSA((reg.defList != null && reg.defList.getNext() == null));
            }
       
        };
       
    }
}


-----------------------------------------------------------------------------------------


package org.jikesrvm.compilers.opt;

import org.jikesrvm.compilers.opt.ir.Register;

public interface ParallelOptCompilerPass {
    Runnable runPassOnSingleRegister(Register reg);
}

---------------------------------------------------------------------------------------


        OptCompilerThread thread = new OptCompilerThread(2);
        ParallelOptCompiler pass = new ParallelOptCompiler();
        for (Register reg = ir.regpool.getFirstSymbolicRegister(); reg != null; reg = reg.getNext()) {
            try {
                thread.execute(pass.runPassOnSingleRegister(reg));
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
       
        thread.shutdown();

1 comment:

  1. Seems like this is reinventing a lot of infrastructure from say j.u.c and for non-obvious reasons making the thread objects NonMoving (which is in general bad).

    An experiment you may want to try is attaching a profiler to the boot image writer and seeing why the opt compiler performance doesn't scale linearly. At the time of Christos' MSc the stock JVM profilers weren't good enough, but I imagine life is better now.

    ReplyDelete