Fork/Join框架是Java7提供了的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务(工作窃取算法),最终汇总每个小任务结果后得到大任务结果的框架。
- Fork: 把一个大任务分成若干个子任务进行执行,设置一个阀值,判断任务数量,如果超越阀值,无线分割下去
- Join: 合并子任务获得最终结果
举例: 1+2+3+…+9999+10000,通过Fork将其分成10分,每份计算1000个数的加法运算。
关于工作窃取算法:
简单的来讲,工作窃取算法就是将一个大任务切分成若干份小任务,每个任务放入到不同的队列当中,每个队列有一个相应的工作线程来执行这些任务,当这些线程执行完当前的任务,线程闲置,为了提高效率以及避免浪费资源,利用这些线程窃取其他队列中的任务来执行已达到高效的目的。
队列一般设置成双端队列: 当前队列中的线程从队列的首端获取任务执行,而窃取的线程从队列的尾端获取任务执行,这种方式也有缺点,当队列中只有一个任务的时候,会造成资源竞争以及创建多个双端队列。
Fork Join框架中两个主要的类
- ForkJoinTask: 创建forkjoin任务,此类在任务中执行fork和join操作的机制,通常继承他的两个子类: RecursiveAction(没有返回结果)、RecursiveTask(有返回结果)
- ForkJoinPool: 任务需要通过此类来执行,任务分割出来的子任务会添加到当前的工作线程所维护的双端队列中,进入队列的头部。 当一个工作线程的队列里暂时没有任务时,他会随机从其他工作线程的队列的尾部获取任务。
Fork Join框架的优势(为什么要使用Fork/Join框架?)
- 多核处理器的执行效率,高效利用多核平台硬件
- 普通的多线程相对复杂
- 避免死锁(死锁是指两个或两个以上的进程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程)
一个简单的Fork/Join使用示例(使用fork/join计算1+2+3+4+5+6):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
package com.glorze.frokjoin; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; /** * 使用fork/join框架计算1+2+3+4+5+6 * @ClassName ForkJoin * @author: 高老四 * @since: 2018年10月18日 下午2:01:06 */ public class ForkJoin extends RecursiveTask<Integer>{ /** * 序列化版本 */ private static final long serialVersionUID = 1L; /** * 设定一个阀值 */ private static final int VALUE = 3; private int start; private int end; public ForkJoin(int start, int end) { super(); this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; //如果任务小于当前阀值就计算任务,无需拆分 boolean canCompute = (end - start) <= VALUE; if(canCompute){ for(int i = start; i<= end; i++){ sum += i; } }else{ //如果任务大鱼阀值,裂变成两个任务 int middle = (start + end) / 2; ForkJoin leftForkJoin = new ForkJoin(start, middle); ForkJoin rightForkJoin = new ForkJoin(middle, end); //分别执行两个子任务 leftForkJoin.fork(); rightForkJoin.fork(); //合并子任务 int leftResult = leftForkJoin.join(); int rightResult = rightForkJoin.join(); sum = leftResult + rightResult; } return sum; } public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(); //计算1+2+3+4+5+6 ForkJoin forkJoin = new ForkJoin(1, 6); //执行 Future<Integer> result = forkJoinPool.submit(forkJoin); try { System.out.println(result.get()); } catch (Exception e) { e.printStackTrace(); } } } |
老四的目前所在公司的业务中也经常使用Fork/Join框架,不过是在JDK提供的原基础之上加以改动和封装了一点点。下面的例子是配合适配器设计模式来生成大量优惠券码,关于适配器模式可以参考老四的《浅析设计模式第十七章之适配器模式》这篇文章。
首先,创建fork接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
package com.glorze.coupon; import java.util.Map; /** * Fork接口 * @ClassName MethodAdapter * @author: 高老四 * @since: 2018年10月18日 下午6:38:20 * @param <P> * @param <V> */ public interface MethodAdapter<P,V> { /** * 判断是否可以执行该任务 * @Title: canCompute * @param data 要执行的任务数据 * @return boolean true: 可执行 false: 不可执行 * @author: glorze.com * @since: 2018年10月18日 下午6:40:01 */ public boolean canCompute(P data); /** * 执行任务 * @Title: compute * @param data 要执行的任务数据 * @param param 其他参数 * @return V 执行结果 * @author: glorze.com * @since: 2018年10月18日 下午6:38:59 */ public V compute(P data, Map<String, Object> param); /** * 当数据不符合阈值时,对数据进行拆分后,左侧的数据 * @Title: leftData * @param data 要执行的任务数据 * @return P 拆分后的左侧数据 * @author: glorze.com * @since: 2018年10月18日 下午6:41:18 */ public P leftData(P data); /** * 当数据不符合阈值时,对数据进行拆分后,右侧的数据 * @Title: rightData * @param data 要执行的任务数据 * @return P 拆分后的右侧数据 * @author: glorze.com * @since: 2018年10月18日 下午6:42:12 */ public P rightData(P data); } |
实现接口从而实现计算过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
package com.glorze.coupon; import java.util.List; import java.util.Map; /** * 实现数据拆分重组 * @ClassName BaseListParamMethodAdapter * @author: 高老四 * @since: 2018年10月18日 下午6:57:09 * @param <V> */ public abstract class BaseListParamMethodAdapter<V> implements MethodAdapter<List, V> { /** * 集合的阈值 */ protected int threshold; public BaseListParamMethodAdapter(int threshold) { super(); this.threshold = threshold; } /** * 是否可计算 */ @Override public boolean canCompute(List data) { return (data.size()<=threshold)? true:false; } /** * 执行计算任务 * @Title: compute * @param data 要执行的任务数据 * @param param 其他参数 * @return V 执行结果 * @author: glorze.com * @since: 2018年10月18日 下午6:38:59 */ @Override public abstract V compute(List data, Map<String, Object> param); /** * 数据拆分后的左侧数据 */ @Override public List leftData(List data) { return data.subList(0, threshold); } /** * 数据拆分后的右侧数据 */ @Override public List rightData(List data) { return data.subList(threshold, data.size()); } } |
封装工具类,调用BaseListParamMethod
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
package com.glorze.coupon; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; /** * 多线程并发执行任务工具类 * @ClassName ForkJoinTaskUtil * @author: 高老四 * @since: 2018年10月18日 下午7:03:57 * @param <P> * @param <V> */ public class ForkJoinTaskUtil<P,V> { private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS"); private P data; private Map<String, Object> param; private MethodAdapter<P,V> adapter; public ForkJoinTaskUtil(P data, Map<String, Object> param, MethodAdapter<P,V> adapter) { super(); this.data = data; this.param = param; this.adapter = adapter; } public List<V> computeResult(){ ForkJoinPool pool = new ForkJoinPool(); //生成计算任务 ComputeTask<P,V> task = new ComputeTask<P,V>(data, param, adapter); //执行一个计算任务 Future<List<V>> future = pool.submit(task); //获取计算结果 try { Date start = new Date(); System.out.println("多线程并发工具类[主线程]开始启用计算任务,当前时间:"+format.format(start)); List<V> result = future.get(); Date end = new Date(); System.out.println("多线程并发工具类[主线程]合并计算结果完毕,当前时间:"+format.format(end)+",共耗时:"+(end.getTime()-start.getTime())+"ms!"); return result; } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } return null; } protected class ComputeTask<P,V> extends RecursiveTask<List<V>>{ private static final long serialVersionUID = 1L; private P data; private Map<String, Object> param; private MethodAdapter<P,V> adapter; public ComputeTask(P data, Map<String, Object> param, MethodAdapter<P,V> adapter) { super(); this.data = data; this.param = param; this.adapter = adapter; } @Override protected List<V> compute() { List<V> result = new ArrayList<V>(); if(adapter.canCompute(data)){ //任务够小直接执行 Date start = new Date(); System.out.println("["+Thread.currentThread()+"]开始计算任务,当前时间:"+format.format(start)); V obj = adapter.compute(data, param); Date end = new Date(); System.out.println("["+Thread.currentThread()+"]执行计算任务完毕,当前时间:"+format.format(end)+",共耗时:"+(end.getTime()-start.getTime())+"ms!"); result.add(obj); }else{ //任务超过阈值,分割任务 System.out.println("["+Thread.currentThread()+"]开始分割任务"); ComputeTask<P,V> left = new ComputeTask<P,V>(adapter.leftData(data), param, adapter); ComputeTask<P,V> right = new ComputeTask<P,V>(adapter.rightData(data), param, adapter); //执行子任务 Date start = new Date(); System.out.println("["+Thread.currentThread()+"]开始启用分割后的子任务,当前时间:"+format.format(start)); left.fork(); right.fork(); //等待子任务执行完 List<V> leftRes = left.join(); List<V> rightRes = right.join(); Date end = new Date(); System.out.println("["+Thread.currentThread()+"]子任务执行完毕,当前时间:"+format.format(end)+",共耗时:"+(end.getTime()-start.getTime())+"ms!"); //合并任务结果 result.addAll(leftRes); result.addAll(rightRes); } return result; } } } |
优惠券生成入库
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
package com.glorze.coupon; import java.text.SimpleDateFormat; import java.util.HashMap; import java.util.List; import java.util.Map; /** * 优惠券处理类 * @ClassName CouponHandle * @author: 高老四 * @since: 2018年10月18日 下午7:12:53 */ public class CouponHandle { /** * 并发存储库存 * @Title: saveMCInventoryThread * @param record * @return Map<String,Integer> * @author: glorze.com * @since: 2018年10月18日 下午7:13:11 */ @Override public Map<String, Integer> saveMCInventoryThread(MerchantCoupon record) { // 要操作的所有优惠券库存集合 List<MerchantCouponInventoryDetail> data = new ArrayList<MerchantCouponInventoryDetail>(); // 拼装要生成的数据集合 // 获取merchantCouponInventoryDetail表code主键的最大值 Map<String, Object> codeMap = new HashMap<String, Object>(16); codeMap.put("couponSid", record.getCouponId()); Long codeId = merchantCouponInventoryDetailMapper.findCodeMax(codeMap); if (codeId == null) { codeId = (long) 0; } // 单条优惠券库存实体 MerchantCouponInventoryDetail inventory; for (int i = 0; i < record.getCreateNum(); i++) { // set各个公共的字段.. inventory = new MerchantCouponInventoryDetail(); inventory.setCode(codeId + i + 1); // 优惠券的sid inventory.setCouponSid(record.getCouponId()); // 优惠券类型 inventory.setCouponType(record.getCouponType()); // 优惠券标题 inventory.setCouponName(record.getCouponName()); // 商户ID inventory.setCustId(record.getCustId()); // 生效时间 inventory.setEffectStartTime(record.getEffectStartTime()); // 失效时间 inventory.setEffectEndTime(record.getEffectEndTime()); // 设置为生效状态 inventory.setStatus("1"); // 设置为未核销状态 inventory.setIsUse("2"); if (record.getStoreInfoSids() != null) { // 设置门店ID inventory.setCrmStoreInfoSid(Long.parseLong(record.getStoreInfoSids())); } // 创建日期 inventory.setCreateTime(record.getCreateTime()); data.add(inventory); } // 构造业务调用封装对象 BaseListParamMethodAdapter<Map<String, Integer>> adapter = new BaseListParamMethodAdapter<Map<String, Integer>>(200) { @Override public Map<String, Integer> compute(List data, Map<String, Object> param) { return insertInventDetails(data); } }; // 构造ForkJoinTaskUtil参数 ForkJoinTaskUtil util = new ForkJoinTaskUtil(data, null, adapter); // 获取执行结果 List<Map<String, Integer>> result = util.computeResult(); // 合并计算结果 System.out.println("[主线程]开始获取并合并计算结果.."); int successcount = 0; int failcount = 0; for (Map<String, Integer> map : result) { successcount += map.get("success"); failcount += map.get("fail"); } // 打印执行结果 System.out.println("成功的数量为:" + successcount); System.out.println("失败的数量为:" + failcount); Map<String, Integer> resultmap = new HashMap<String, Integer>(16); resultmap.put("successcount", successcount); resultmap.put("failcount", failcount); return resultmap; } /** * 单个线程要执行的业务操作 循环要操作的数据集合 拼装各个字段(动态生成二维码code,将二维码保存到mongodb服务器) * 将拼装完各字段的数据实体插入到mysql 最后返回操作成功或者失败的条数 * @Title: insertInventDetails * @param data * @return Map<String,Integer> * @author: 高老四 * @since: 2018年10月18日 下午7:22:56 */ public Map<String, Integer> insertInventDetails(List<MerchantCouponInventoryDetail> data) { // 生成12位随机数字 SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyMMddHHmmss"); // 遍历操作数据 for (MerchantCouponInventoryDetail inventory : data) { try { // set各优惠券库存不一样的属性>.. sid,code,二维码,核销连接URL String codeStr = String.format("%06d", inventory.getCode()); String cryptCode = CouponCodeEncryptUtil.encryptData(Long.valueOf(inventory.getCouponSid() + codeStr)); // 生成sid inventory.setSid(UUIDGenerator.generaterLongKeyByNanoTime()); // 生成code inventory.setCryptCode(cryptCode); // 生成核销连接URL以及二维码 // String hexiaocode = PropertiesUtils.getInitValue("wapHomeUrl") + "/card/recharge/"+data.get(0).getUseCode(); String hexiaoUrl = ""; if (MerchantCouponInventoryDetail.COUPON_TYPE_FLOW.equals(inventory.getCouponType())) { hexiaoUrl = PropertiesUtils.getInitValue("wapHomeUrl") + "/coupon/recharge/" + cryptCode; } else { hexiaoUrl = PropertiesUtils.getInitValue("httpHomeUrl") + "/coupon/verify/" + cryptCode; } inventory.setUseLinkUrl(hexiaoUrl); // 保存二维码到mongodb服务器 // byte[] code = QRCodeUtil.buildQRCode(hexiaoUrl); // 保存二维码(没有白边)到mongodb服务器 byte[] code = QRCodeUtil.buildQRCodeNoWhite(hexiaoUrl, null, null, null); String hexiaoQrcode = MongoDbClient.getInstance().writeFile(code); inventory.setUseLinkUrlImg(hexiaoQrcode); } catch (Exception e) { e.printStackTrace(); } } // 批量插入数据 int effectRows = merchantCouponInventoryDetailMapper.insertBatch(data); // 返回操作成功,失败的条数 Map<String, Integer> result = new HashMap<String, Integer>(16); result.put("success", effectRows); result.put("fail", data.size() - effectRows); return result; } } |
以上代码仅供思路参考。 其实关于Java中的Fork Join框架还有很多高深的知识点需要我们理解,这里老四推荐ifeve.com的fork标签,里面提供了的很多优秀的译文和文章深度解读fork/join框架,传送门:
- 《A Java Fork/Join Framework-Doug Lea》(翻译可以参考ifeve.com的Alex翻译的版本详解: 《Java Fork Join 框架》)
- Java中的Fork Join框架之fork标签
- JDK中的Fork Join框架设计值join标签
更博不易,如果觉得文章对你有帮助并且有能力的老铁烦请捐赠盒烟钱,点我去赞助。或者扫描文章下面的微信/支付宝二维码打赏任意金额(点击「给你买杜蕾斯」),也可扫描小站放的支付宝领红包二维码,线下支付享受优惠的同时老四也可以获得对应赏金,老四这里抱拳谢谢诸位了。捐赠时请备注姓名或者昵称,因为您的署名会出现在赞赏列表页面,您的捐赠钱财也会被用于小站的服务器运维上面,再次抱拳感谢。