自己动手写分布式搜索引擎
上QQ阅读APP看书,第一时间看更新

3.2.12 多线程写索引

Lucene默认只使用一个线程写索引,而且一个索引只能由一个进程打开。

        public class ThreadedIndexWriter extends IndexWriter {


          private ExecutorService threadPool;
          private Analyzer defaultAnalyzer;


          private class Job implements Runnable {           //保留要加入索引的一个文档
            Document doc;
            Analyzer analyzer;
            Term delTerm;
            public Job(Document doc, Term delTerm, Analyzer analyzer) {
              this.doc = doc;
              this.analyzer = analyzer;
              this.delTerm = delTerm;
            }
            public void run() {                        //实际增加和更新文档
              try {
              if (delTerm ! = null) {
                ThreadedIndexWriter.super.updateDocument(delTerm, doc, analyzer);
              } else {
                ThreadedIndexWriter.super.addDocument(doc, analyzer);
              }
              } catch (IOException ioe) {
              throw new RuntimeException(ioe);
      }
    }
  }


  public ThreadedIndexWriter(Directory dir, Analyzer a,
                          boolean create, int numThreads,
                          int maxQueueSize, IndexWriter.MaxFieldLength mfl)
       throws CorruptIndexException, IOException {
    super(dir, a, create, mfl);
    defaultAnalyzer = a;
    threadPool = new ThreadPoolExecutor(             //创建线程池
          numThreads, numThreads,
          0, TimeUnit.SECONDS,
          new ArrayBlockingQueue<Runnable>(maxQueueSize, false),
          new ThreadPoolExecutor.CallerRunsPolicy());
  }


  public void addDocument(Document doc) {             //让线程池增加文档
    threadPool.execute(new Job(doc, null, defaultAnalyzer));
  }


  public void addDocument(Document doc, Analyzer a) {   //让线程池增加文档
    threadPool.execute(new Job(doc, null, a));
  }


  public void updateDocument(Term term, Document doc) {   //让线程池更新文档
    threadPool.execute(new Job(doc, term, defaultAnalyzer));
  }


  //让线程池更新文档
  public void updateDocument(Term term, Document doc, Analyzer a) {
    threadPool.execute(new Job(doc, term, a));
  }


  public void close()
      throws CorruptIndexException, IOException {
    finish();
    super.close();
  }


  public void close(boolean doWait)
      throws CorruptIndexException, IOException {
    finish();
    super.close(doWait);
  }


  public void rollback()
            throws CorruptIndexException, IOException {
          finish();
          super.rollback();
        }


        private void finish() {       //关闭线程池
          threadPool.shutdown();
          while(true) {
            try {
              if (threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) {
                break;
              }
            } catch (InterruptedException ie) {
              Thread.currentThread().interrupt();
              throw new RuntimeException(ie);
            }
          }
        }
      }

如果是多线程写索引,则让每个线程使用一个不同的Document,不要在多个线程之间共享同一个Document。