最近一直在优化海量数据(几千万)处理这一块。我使用的是java提供的ExecuterPool线程池来实现的,这几天在研究如何使用生产者和消费者模式去解决类似处理数据的问题,下面是思考与实现的过程~
思考
简单的介绍下生产者与消费者模式,详细的可以去google。
吃过快餐肯定会遇到这样的场景:
你去打土豆丝,拿着大勺的大妈就会往你的盘子里放上一勺土豆丝,后厨的师傅会时不时的把做好的土豆丝端上来,有时候你去晚了,然而土豆丝师傅还在做,你又很想吃,那就只能稍等一会了,有时候人很多,那么可能就会有两三个大妈负责盛菜。
好了,来分析下上面的场景,一些名词在下面的程序中有出现
- 生产者(Producer)负责生产数据即厨师抄土豆丝;
- 消费者(Ponsumer)负责处理数据即“你”吃土豆丝;
(这里可能要把“大妈就往你的盘子里放上一勺土豆丝”作为消费者的行为,具体需要个人去体会,这里只是方便理解) - 缓冲区(Storage)负责数据的缓存即大妈身旁的菜盘子;
好的,然后回到处理数据的问题上,我简单画了一下过程:
如果你有处理过数据,这个过程你肯定会遇到
- 黑色表示数据的流动:读数据->存到集合中->处理数据->存数据;
- 蓝色的表示各个环节的耗时操作
读数据时间、处理数据时间、存数据时间; - 红色表示的是需要优化的地方,大体包括软件(代码)与硬件(cpu个数与内存大小)。
生产者消费者的实现
下面是看了http://blog.chinaunix.net/uid-20680669-id-3602844.html博客 之后结合上图写出的代码。
Storage
1 | public class Storage { |
Producer
1 | public class Producer implements Runnable { |
Consumer
1 | public class Consumer implements Runnable { |
启动项
1 | public static void main(String[] args) { |
以上代码你可以直接复制到自己的ide中直接启动~,之后我会再github上创建个repo管理这些代码
下面是结果
1 | 数据size: 61 |
代码解读
这里创建了一个生产者线程,十个消费者线程,生产者每次随机生成数据模拟从数据库读数据并存入cacheList中,直到产生的为0的时候,意味着数据库中没有需要处理的数据了。十个消费者分别取处理这些数据。
关于生产者与消费者的实现的方式现在有
(1)wait() / notify()方法
(2)await() / signal()方法
(3)BlockingQueue阻塞队列方法
(4)PipedInputStream / PipedOutputStream
但是上面的思路不变。
上面可以理解为一个处理数据的框架,以后处理数据直接填充就ok了~
值得优化的地方
其实对于处理海量数据这块,可能优化工作占得比重比较大。就个人经历讲一下方法。
首先你需要把这三个过程各自消耗的时间统计出来,比如表中有1000w数据,那就先统计下处理10w或者20w所需要的时间,这里强调一下总数据量是1000w和10w分别去处理10w条数据消耗的时间是两码事!两码事!两码事!特别是有要关联其他表的时候,不信?自己去测试下!
- 如果是processTime占用的时间久,那就去优化代码,是否有更优的排序算法、去重算法等等,这就要看实际的算法需求了。
- 如果是readTime的时间长,那就要控制一下每一次读取的数据量。如果你需要关联好几张表,可以再试一下使用join和不使用join查询的时间。
- 如果是writeTime的时间长,就要控制一下每一次存入数据库的数据量。
这里强调一下,数据的写一定要使用批量写的方式!!!
与硬件有关的优化
根据机器的cpu核数来确定代码中开的线程数,如果线程开多了,各线程之间的切换也需要消耗时间,具体的可参看博客http://ifeve.com/how-to-calculate-threadpool-size/,我是按照下面规则去设置线程池的大小
- 如果是CPU密集型应用,则线程池大小设置为N+1
- 如果是IO密集型应用,则线程池大小设置为2N+1
然后内存也需要考虑,因为你有把数据存入缓存的,数据量要控制好,不能把内存撑爆了。
如果配置低了,那就申请升级配置,如8G内存,四核处理器~
上面如果你都尝试了,但是任然需要很久的时间,这个时候那就需要加机器,比如开四台机器来处理1000w数据,这个就要使用分页的思想把数据分成四块去处理~
总结
处理海量数据的过程还是能学到很多知识,从软件到硬件,从算法到jvm等等。
以上写的就是自己的一点点经验,能帮到你的话点个赞~代码在Github