1.5 使用多线程技术提升爬虫性能
上一节讲述爬虫架构时曾经提到过,为了提升爬虫性能,需要采用多线程的爬虫技术。开源软件Heritrix已经采用了多线程的爬虫技术来提高性能,而且很多大型网站都采用多个服务器镜像的方式以提供同样的网页内容。采用多线程并行抓取能同时获取同一个网站的多个服务器中的网页,这样能极大地减少抓取这类网站的时间。
1.5.1 详解Java多线程
1. 创建多线程的方法
多线程是一种机制,它允许在程序中并发执行多个指令流,每个指令流都称为一个线程,彼此间互相独立。
线程又称为轻量级进程,它和进程一样拥有独立的执行控制,由操作系统负责调度。区别在于线程没有独立的存储空间,而是和所属进程中的其他线程共享存储空间,这使得线程间的通信较进程简单。
多个线程的执行是并发的,即在逻辑上是“同时”的。如果系统只有一个CPU,那么真正的“同时”是不可能的,但是由于CPU切换的速度非常快,用户感觉不到其中的区别,因此用户感觉到线程是同时执行的。
为了创建一个新的线程,需要做哪些工作呢?很显然,必须指明这个线程所要执行的代码。在Java语言中,通过JDK提供的java.lang.Thread类或者java.lang.Runable接口,能够轻松地添加线程代码。
具体如何实现线程执行的代码呢?先看一看java.lang.Thread类。java.lang.Thread类最重要的方法是run(),它被java.lang.Thread类的方法start()所调用,提供线程所要执行的代码。也就是说,run()方法里面的代码就是线程执行时所运行的代码。
再来看java.lang.Runable接口,这个接口中只有一个run()方法,和java.lang.Thread类中的run()方法类似,java.lang.Runable接口的run()方法中的代码就是线程执行的代码。
因此,在Java语言中,要创建一个线程,可以使用以下两种方法。
方法一:继承java.lang.Thread类,覆盖方法run(),在创建的java.lang.Thread类的子类中重写run()方法。下面是一个例子:
public class MyThread extends Thread { int count= 1, number; public MyThread(int num) { number = num; System.out.println("创建线程 " + number); } public void run() { while(true) { System.out.println("线程 " + number + ":计数 " + count); if(++count== 6) return; } } public static void main(String args[]){ for(int i = 0; i<5; i++) new MyThread(i+1).start(); ) }
这种方法简单明了,但是,它也有一个很大的缺陷,如果线程类MyThread已经从一个类继承(如小程序必须继承自Applet类),而无法再继承java.lang.Thread类应该怎么办呢?这时,就必须使用下面的方法二来实现线程。
方法二:实现java.lang.Runnable接口。
java.lang.Runnable接口只有一个run()方法,库创建一个类实现java.lang.Runnable接口并提供这一方法的实现,将线程代码写入run()方法中,并新建一个java.lang.Thread类,将实现java.lang.Runnable的类作为参数传入,就完成了创建新线程的任务。下面是一个例子:
public class MyThread implements Runnable { int count= 1, number; public MyThread(int num) { number = num; System.out.println("创建线程 " + number); } public void run() { while(true) { System.out.println ("线程 " + number + ":计数 " + count); if(++count== 6) return; } } public static void main(String args[]) { for(int i = 0; i < 5; i++) new Thread(new MyThread(i+1)).start(); } }
严格地说,创建java.lang.Thread子类的实例也是可行的,但必须注意的是,该子类不能覆盖java.lang.Thread类的run()方法,否则该线程执行的将是子类的run()方法,而不是执行实现java.lang.Runnable接口的类的run()方法,对此读者不妨试验一下。
方法二使得能够在一个类中包容所有的代码,有利于封装。这种方法的缺点在于,只能使用一套代码,若想创建多个线程并使各个线程执行不同的代码,则必须额外创建类。如果这样的话,在大多数情况下也许还不如直接用多个类分别继承java.lang.Thread来得紧凑。
2. Java语言对线程同步的支持
下面首先讲解线程的状态。一个线程具有以下四种状态。
● 新状态:线程已被创建但尚未执行(start()方法尚未被调用)。
● 可执行状态:线程可以执行,但不一定正在执行。CPU时间随时可能被分配给该线程,从而使得它执行。
● 死亡状态:正常情况下,run()方法返回使得线程死亡。调用java.lang.Thread类中的stop()或destroy()方法亦有同样的效果,但是不推荐使用这两种方法,前者会产生异常,后者是强制终止,不会释放锁。
● 阻塞状态:线程不会被分配CPU时间,无法执行。
编写多线程程序通常会遇到线程的同步问题,什么是线程同步问题呢?
由于同一进程的多个线程共享存储空间,在带来方便的同时,也会带来访问冲突这个严重的问题。Java语言提供了专门机制以解决这种冲突,有效地避免了同一个数据对象被多个线程同时访问的问题。
Java语言中解决线程同步问题是依靠synchronized关键字来实现的,它包括两种用法:synchronized()方法和synchronized块。
1)synchronized()方法
通过在方法声明中加入synchronized关键字来声明该方法是同步方法,即多线程执行的时候各个线程之间必须顺序执行,不能同时访问该方法。例如:
public synchronized void accessVal(int newVal);
在Java语言中,每个对象都拥有一把锁,当执行这个对象的synchronized()方法时,必须获得该对象的锁方能执行,否则所属线程阻塞。而synchronized()方法一旦执行,就独占该锁,直到从synchronized()方法返回时才将锁释放,之后被阻塞的线程方能获得该锁,重新进入可执行状态。
这种对象锁机制确保同一时刻对于每一个对象,其所有声明为synchronized的方法中至多只有一个处于可执行状态,从而有效地避免了类成员变量的访问冲突。
在Java中,不仅是对象,每一个类都对应一把锁,因此也可将类的静态成员函数声明为synchronized,以控制其对类的静态成员变量的访问。
synchronized()方法的缺陷:若将一个执行时间较长的方法声明为synchronized,将会大大影响程序运行的效率。因此Java为我们提供了更好的解决办法,那就是synchronized块。
2)synchronized块
通过synchronized关键字来声明synchronized块。语法如下:
synchronized(syncObject) { //允许访问控制的代码 }
synchronized块是这样一种代码块,块的代码必须获得syncObject对象(如前所述,可以是类实例或类)的锁才能执行。由于synchronized块可以是任意代码块,且可任意指定锁的对象,因此灵活性较高。
3. Java语言对线程阻塞的支持
讲完了线程的同步机制,下面介绍Java语言对线程阻塞机制的支持。
阻塞指的是暂停一个线程的执行以等待某个条件发生(如等待资源就绪),学习过操作系统的读者对它一定非常熟悉。Java提供了大量方法来支持阻塞,下面逐一进行分析。
● sleep()方法:sleep()允许指定以毫秒为单位的一段时间作为参数,它使得线程在指定的时间内进入阻塞状态,不能得到CPU时间片,指定的时间一过,线程重新进入可执行状态。例如,当线程等待某个资源就绪时,测试发现条件不满足后,让线程sleep()一段时间后重新测试,直到条件满足为止。
● suspend()和resume()方法:两个方法配套使用,suspend()方法使线程进入阻塞状态,并且不会自动恢复,必须对其应用resume()方法,才能使得线程重新进入可执行状态。例如,当前线程等待另一个线程产生的结果时,如果发现结果还没有产生,会调用suspend()方法,另一个线程产生了结果后,调用resume()方法使其恢复。
● yield()方法:yield()方法使得线程放弃当前分得的CPU时间片,但不使线程阻塞,即线程仍处于可执行状态,随时可能再次分得CPU时间。
● wait()和notify()方法:两个方法配套使用,wait()方法可以使线程进入阻塞状态,它有两种形式,一种允许指定以毫秒为单位的一段时间作为参数,另一种没有参数。前者当对应的notify()方法被调用或者超出指定时间时,线程重新进入可执行状态;后者则必须对应的notify()方法被调用时才可进入执行状态。
在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其他更多资源。线程对象也不例外。当前,比较流行的一种技术是“池化技术”,即在系统启动的时候一次性创建多个对象并且保存在一个“池”中,当需要使用的时候直接从“池”中取得而不是重新创建。这样可以大大提高系统性能。
Java语言在JDK 1.5以后的版本中提供了一个轻量级线程池——ThreadPool。可以使用线程池来执行一组任务。简单的任务没有返回值,如果主线程需要获得该线程的返回值,可以使任务实现Callable接口,线程池执行任务并通过Future的实例返回线程的执行结果。Callable和java.lang.Runnable的区别如下。
● Callable定义的方法是call(),而Runnable定义的方法是run()。
● Callable的call()方法可以有返回值,而Runnable的run()方法不能有返回值。
● Callable的call()方法可以抛出异常,而Runnable的run()方法不能抛出异常。
Future表示异步计算的结果,它提供了检查计算是否完成的方法,以等待计算的完成,并检索计算的结果。Future的cancel()方法取消任务的执行,cancel()方法有一个布尔参数,参数为true表示立即中断任务的执行,参数为false表示允许正在运行的任务运行完成。Future的get()方法等待计算完成,并获取计算结果。
下面的例子使用ThreadPool实现并行下载网页。在继承Callable方法的任务类中下载网页的实现如下:
public class DownLoadCall implements Callable<String> { private URL url; // 待下载的URL public DownLoadCall(URL u){ url = u; } @Override public String call() throws Exception { String content = null; //下载网页 return content; } }
主线程类创建ThreadPool并执行下载任务的实现如下:
int threads = 4; //并发线程数量 ExecutorService es = Executors.newFixedThreadPool(threads);//创建线程池 Set<Future<String>> set = new HashSet<Future<String>>(); for (final URL url : urls) { DownLoadCall task = new DownLoadCall(url); Future<String[]> future = es.submit(task);//提交下载任务 set.add(future); } //通过future对象取得结果 for (Future<String> future : set) { String content = future.get(); //处理下载网页的结果 }
采用线程池可以充分利用多核CPU的计算能力,并且简化了多线程的实现。
1.5.2 爬虫中的多线程
多线程爬虫的结构如图1.14所示。
图1.14 多线程爬虫的结构
对并行爬虫架构而言,处理空队列要比序列爬虫更加复杂。空的队列并不意味着爬虫已经完成了工作,因为此刻其他的进程或者线程可能依然在解析网页,并且马上会加入新的URL。进程或者线程管理员需要给报告队列为空的进程/线程发送临时的休眠信号来解决这类问题。线程管理员需要不断跟踪休眠线程的数目,只有当所有的线程都休眠的时候,爬虫才可以终止。
1.5.3 一个简单的多线程爬虫实现
以下是一个多线程爬虫程序的主线程部分和子线程部分,主线程启动子线程并等待所有子线程执行完成后才退出,实现代码如下:
threadList = new ArrayList<Thread>(THREAD_NUM); for (int i = 0; i < THREAD_NUM; i++) { Thread t = new Thread(this, "Spider Thread #" + (i+1)); t.start(); threadList.add(t); } //当前线程等待子线程退出 while (threadList.size() > 0) { Thread child = (Thread)threadList.remove(0); child.join(); //等待这个线程执行完 }
子线程主要的执行程序如下:
//从TODO取出要分析的URL地址,同时把它放入Visited表 public synchronized NewsSource dequeueURL() throws Exception { while (true) { if (!todo.isEmpty()) { NewsSource newitem = (NewsSource)todo.removeFirst(); visited.add(newitem.URL,newitem.source); return newitem; } else { threads--; //等待线程数的计数器减1 if (threads > 0) {//如果仍然有其他的线程在活动则等待 wait(); threads++;//等待线程数的计数器加1 } else {//如果其他线程都在等待,则通知所有在等待的线程集体退出 notifyAll(); return null; } } } } // enqueueURL把新发现的URL放入TODO表 public synchronized void enqueueURL(NewsSource newitem) { if (!visited.contains(newitem.URL)) { todo.add(newitem); visited.add(newitem.URL,newitem.source); notifyAll();//唤醒在等待的线程 } } public void run() { NewsSource item; try { while ((item = dequeueURL()) != null) { indexURL(item);//包含把新的URL放入TODO表的过程 } } catch(Exception e) { e.printStackTrace(); } threads--; }
1.5.4 详解Heritrix多线程结构
要想更有效、更快速地抓取网页内容,必须采用多线程抓取。开源软件Heritrix采用传统的线程模型实现了一个标准的线程池ToePool,它用于管理所有的抓取线程。ToeThread则继承了Thread类,并实现了run()方法。
ToePool和ToeThread都位于org.archive.crawler.framework包中。ToePool的初始化是在CrawlController的initialize()方法中完成的。以下是在CrawlController中用于对ToePool进行初始化的代码。
//构造函数 toePool = new ToePool(this); //按order.xml中的配置,实例化并启动线程 toePool.setSize(order.getMaxToes());
ToePool的构造函数很简单,如下所示:
public ToePool(CrawlController c) { super("ToeThreads"); this.controller = c; }
它仅仅是调用了父类java.lang.ThreadGroup的构造函数,同时,将注入的CrawlController赋给类变量。这样,便建立起一个线程池的实例。
真正的工作线程在线程池中的setSize(int)方法中创建。从名称上看,这个方法很像一个普通的赋值方法,但实际上,这个方法调整了抓取的线程数量。代码如下:
public void setSize(int newsize) { targetSize = newsize; int difference = newsize - getToeCount(); // 如果发现线程池中的实际线程数量小于应有的数量 // 则启动新的线程 if (difference > 0) { for(int i = 1; i <= difference; i++) { // 启动新线程 startNewThread(); } } // 如果线程池中的线程数量已经达到需要 else { int retainedToes = targetSize; // 将线程池中的线程管理起来放入数组中 Thread[] toes = this.getToes(); // 循环去除多余的线程 for (int i = 0; i < toes.length ; i++) { if(!(toes[i] instanceof ToeThread)) { continue; } retainedToes--; if (retainedToes>=0) { continue; } ToeThread tt = (ToeThread)toes[i]; tt.retire(); // ToeThread中定义的方法,通知这个线程尽早结束 } } } // 用于取得所有属于当前线程池的线程 private Thread[] getToes() { Thread[] toes = new Thread[activeCount()+10]; // 由于ToePool继承自java.lang.ThreadGroup类 // 因此当调用enumerate(Thread[] toes)方法时, // 实际上是将该ThreadGroup中开辟的所有线程放入 // toes这个数组中,以备后面的管理 this.enumerate(toes); return toes; } // 开启一个新线程 private synchronized void startNewThread() { ToeThread newThread = new ToeThread(this, nextSerialNumber++); newThread.setPriority(DEFAULT_TOE_PRIORITY); //设置线程优先级 newThread.start(); //启动线程 }
根据上面的代码可以得出这样的结论:线程池本身在创建的时候并没有任何活动的线程实例,只有当它的setSize()方法被调用时,才创建新线程;如果当setSize()方法被调用多次而传入不同的参数时,线程池会根据参数里设定值的大小来改变池中所管理的线程数量。当启动Toe线程后,执行的是其run()方法中的代码。通过run()方法中的代码可以看到ToeThread到底如何处理从Frontier中获得的要抓取的链接。
public void run() { String name = controller.getOrder().getCrawlOrderName(); logger.fine(getName()+" started for order '"+name+"'"); try { while ( true ) { // 检查是否应该继续处理 continueCheck(); setStep(STEP_ABOUT_TO_GET_URI); // 使用Frontier的next(),方法从Frontier中取出下一个要处理的链接 CrawlURI curi = controller.getFrontier().next(); // 同步当前线程 synchronized(this) { continueCheck(); setCurrentCuri(curi); } /* * 处理取出的链接 */ processCrawlUri(); setStep(STEP_ABOUT_TO_RETURN_URI); // 检查是否应该继续处理 continueCheck(); // 使用Frontier的finished()方法来对刚才处理的链接做收尾工作 // 比如将分析得到的新的链接加入等待队列中 synchronized(this) { controller.getFrontier().finished(currentCuri); setCurrentCuri(null); } // 后续的处理 setStep(STEP_FINISHING_PROCESS); lastFinishTime = System.currentTimeMillis(); //释放链接 controller.releaseContinuePermission(); if(shouldRetire) { break; // from while(true) } } } catch (EndedException e) { } catch (Exception e) { logger.log(Level.SEVERE,"Fatal exception in "+getName(),e); } catch (OutOfMemoryError err) { seriousError(err); } finally { controller.releaseContinuePermission(); } setCurrentCuri(null); // 清理缓存数据 this.httpRecorder.closeRecorders(); this.httpRecorder = null; localProcessors = null; logger.fine(getName()+" finished for order '"+name+"'"); setStep(STEP_FINISHED); controller.toeEnded(); controller = null; }
工作线程通过调用Frontier的next()方法取得下一个待处理的链接,然后对链接进行处理,并调用Frontier的finished()方法来收尾、释放链接,最后清理缓存,终止单步工作。另外,其中还有一些日志操作,主要是为了记录每次抓取的各种状态。以上代码中,最重要的语句是processCrawlUri(),它调用处理链对链接进行处理。