用Kotlin Coroutines简单改造原有的爬虫框架
日落的风景.jpg
NetDiscover 是一款基于 Vert.x、RxJava2 实现的爬虫框架。由于我最近正好在学习 Kotlin 的 Coroutines,在学习过程中尝试改造一下自己的爬虫框架。所以,我为它新增了一个板块:coroutines 板块。
一. 爬虫框架的基本原理:
对于单个爬虫而言,从消息队列 queue 中获取 request,而后通过下载器 downloader 完成网络请求并取得 html 的内容,通过解析器 parser 解析 html 的内容,而后由多个 pipeline 按照顺序执行操作。其中,downloader、queue、parser、pipeline 这些组件都是接口,爬虫框架里内置了它们很多实现。开发者可以根据自身情况来选择用或者者自己开发全新的实现。
basic_principle.png
下面响应式风格的代码反映了上图爬虫框架的基本原理:
// 从消息队列中取出request final Request request = queue.poll(name); ...... // request正在解决 downloader.download(request) .map(new Function<Response, Page>() { @Override public Page apply(Response response) throws Exception { Page page = new Page(); page.setRequest(request); page.setUrl(request.getUrl()); page.setStatusCode(response.getStatusCode()); if (Utils.isTextType(response.getContentType())) { // text/html page.setHtml(new Html(response.getContent())); return page; } else if (Utils.isApplicationJSONType(response.getContentType())) { // application/json // 将json字符串转化成Json对象,放入Page的"RESPONSE_JSON"字段。之所以转换成Json对象,是由于Json提供了toObject(),可以转换成具体的class。 page.putField(Constant.RESPONSE_JSON,new Json(new String(response.getContent()))); return page; } else if (Utils.isApplicationJSONPType(response.getContentType())) { // application/javascript // 转换成字符串,放入Page的"RESPONSE_JSONP"字段。 // 因为是jsonp,需要开发者在Pipeline中自行去掉字符串前后的内容,这样即可以变成json字符串了。 page.putField(Constant.RESPONSE_JSONP,new String(response.getContent())); return page; } else { page.putField(Constant.RESPONSE_RAW,response.getIs()); // 默认情况,保存InputStream return page; } } }) .map(new Function<Page, Page>() { @Override public Page apply(Page page) throws Exception { if (parser != null) { parser.process(page); } return page; } }) .map(new Function<Page, Page>() { @Override public Page apply(Page page) throws Exception { if (Preconditions.isNotBlank(pipelines)) { pipelines.stream() .forEach(pipeline -> pipeline.process(page.getResultItems())); } return page; } }) .observeOn(Schedulers.io()) .subscribe(new Consumer<Page>() { @Override public void accept(Page page) throws Exception { log.info(page.getUrl()); if (request.getAfterRequest()!=null) { request.getAfterRequest().process(page); } } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { log.error(throwable.getMessage()); } });其中,Downloader的download方法会返回一个Maybe<Response>。
import com.cv4j.netdiscovery.core.domain.Request;import com.cv4j.netdiscovery.core.domain.Response;import io.reactivex.Maybe;import java.io.Closeable;/** * Created by tony on 2017/12/23. */public interface Downloader extends Closeable { Maybe<Response> download(Request request);}正是由于这个 Maybe<Response> 对象,后续的一系列的链式调使用才显得非常自然。比方将Response转换成Page对象,再对Page对象进行解析,Page解析完毕之后做一系列的pipeline操作。
当然,在爬虫框架里还有 SpiderEngine 可以管理 Spider。
二. 用协程改造
协程是一种使用户态的轻量级线程,协程的调度完全由使用户控制。协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其余地方,在切回来的时候,恢复先前保存的寄存器上下文和栈,直接操作栈则基本没有内核切换的开销,可以不加锁的访问全局变量,所以上下文的切换非常快。
因为 Kotlin Coroutines 依然是实验的API,所以我不打算在爬虫框架原有的 core 板块上进行改动。于是,新添加一个板块。
在新板块里,将之前的响应式风格的代码,改造成协程的方式。
Kotlin Coroutines 为各种基于 reactive streams 规范的库提供了工具类。可以在下面的github地址找到。
Kotlin/kotlinx.coroutines/tree/master/reactive
我在build.gradle中增加了
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:0.23.0' compile 'org.jetbrains.kotlinx:kotlinx-coroutines-rx2:0.23.0'注意,协程的版本号必需跟 Kotlin 的版本要相符和。我所用的 Kotlin 的版本是1.2.41
下面是修改之后的 Kotlin 代码,原有的各种组件接口仍然可以用。
// 从消息队列中取出request final Request request = queue.poll(name); ...... // request正在解决 val download = downloader.download(request).await() download?.run { val page = Page() page.request = request page.url = request.url page.statusCode = statusCode if (Utils.isTextType(contentType)) { // text/html page.html = Html(content) } else if (Utils.isApplicationJSONType(contentType)) { // application/json // 将json字符串转化成Json对象,放入Page的"RESPONSE_JSON"字段。之所以转换成Json对象,是由于Json提供了toObject(),可以转换成具体的class。 page.putField(Constant.RESPONSE_JSON, Json(String(content))) } else if (Utils.isApplicationJSONPType(contentType)) { // application/javascript // 转换成字符串,放入Page的"RESPONSE_JSONP"字段。 // 因为是jsonp,需要开发者在Pipeline中自行去掉字符串前后的内容,这样即可以变成json字符串了。 page.putField(Constant.RESPONSE_JSONP, String(content)) } else { page.putField(Constant.RESPONSE_RAW, `is`) // 默认情况,保存InputStream } page }?.apply { if (parser != null) { parser!!.process(this) } }?.apply { if (Preconditions.isNotBlank(pipelines)) { pipelines.stream() .forEach { pipeline -> pipeline.process(resultItems) } } }?.apply { println(url) if (request.afterRequest != null) { request.afterRequest.process(this) } }其中,download 变量返回了 Maybe<Response> 的结果。之后, run、apply 等 Kotlin 标准库的扩展函数替代了原价的 RxJava 的 map 操作。
Kotlin 的协程是无阻塞的异步编程方式。上面看似同步的代码,其实是异步实现的。
await() 方法是 Maybe 的扩展函数:
/** * Awaits for completion of the maybe without blocking a thread. * Returns the resulting value, null if no value was produced or throws the corresponding exception if this * maybe had produced error. * * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function * immediately resumes with [CancellationException]. */@Suppress("UNCHECKED_CAST")public suspend fun <T> MaybeSource<T>.await(): T? = (this as MaybeSource<T?>).awaitOrDefault(null)因为 await() 方法是suspend修饰的,所以在上述代码的最外层还得加上一段代码,来创立协程。
runBlocking(CommonPool) { ...... }到此,完成了最初的改造,感兴趣的同学可以查看我的爬虫框架。
github地址: fengzhizi715/NetDiscovery
三. 小结
随着 Kotlin Coroutines 未来的正式发布,爬虫框架的 coroutines 板块也会考虑合并到 core 板块中。以及随着个人对 Kotlin Coroutines 的进一步认识和了解,也会考虑在更多的地方用 Coroutines ,例如 Vert.x 和 Kotlin Coroutines 相结合。
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是摆设,本站源码仅提供给会员学习使用!
7. 如遇到加密压缩包,请使用360解压,如遇到无法解压的请联系管理员
开心源码网 » 用Kotlin Coroutines简单改造原有的爬虫框架