广西中小企业网站建设,南博网站建设,服装建设网站的原因,o2o平台有哪些网站前言
并发编程是一项非常重要的技术#xff0c;无论在面试#xff0c;还是工作中出现的频率非常高。
并发编程说白了就是多线程编程#xff0c;但多线程一定比单线程效率更高#xff1f;
答#xff1a;不一定#xff0c;要看具体业务场景。
毕竟如果使用了多线程无论在面试还是工作中出现的频率非常高。
并发编程说白了就是多线程编程但多线程一定比单线程效率更高
答不一定要看具体业务场景。
毕竟如果使用了多线程那么线程之间的竞争和抢占cpu资源线程的上下文切换也是相对来说比较耗时的操作。
下面这几个问题在面试中你必定遇到过
你在哪来业务场景中使用过多线程怎么用的踩过哪些坑
今天聊聊我之前在项目中用并发编程的12种业务场景给有需要的朋友一个参考。 1. 简单定时任务
各位亲爱的朋友你没看错Thread类真的能做定时任务。如果你看过一些定时任务框架的源码你最后会发现它们的底层也会使用Thread类。
实现这种定时任务的具体代码如下
public static void init() {new Thread(() - {while (true) {try {System.out.println(下载文件);Thread.sleep(1000 * 60 * 5);} catch (Exception e) {log.error(e);}}}).start();
}使用Thread类可以做最简单的定时任务在run方法中有个while的死循环当然还有其他方式执行我们自己的任务。有个需要特别注意的地方是需要用try...catch捕获异常否则如果出现异常就直接退出循环下次将无法继续执行了。
但这种方式做的定时任务只能周期性执行不能支持定时在某个时间点执行。
特别提醒一下该线程建议定义成守护线程可以通过setDaemon方法设置让它在后台默默执行就好。
使用场景比如项目中有时需要每隔5分钟去下载某个文件或者每隔10分钟去读取模板文件生成静态html页面等等一些简单的周期性任务场景。
使用Thread类做定时任务的优缺点
优点这种定时任务非常简单学习成本低容易入手对于那些简单的周期性任务是个不错的选择。缺点不支持指定某个时间点执行任务不支持延迟执行等操作功能过于单一无法应对一些较为复杂的场景。
2.监听器
有时候我们需要写个监听器去监听某些数据的变化。
比如我们在使用canal的时候需要监听binlog的变化能够及时把数据库中的数据同步到另外一个业务数据库中。 如果直接写一个监听器去监听数据就太没意思了我们想实现这样一个功能在配置中心有个开关配置监听器是否开启如果开启了使用单线程异步执行。
主要代码如下
Service
public CanalService {private volatile boolean running false;private Thread thread;Autowiredprivate CanalConnector canalConnector;public void handle() {//连接canalwhile(running) {//业务处理}}public void start() {thread new Thread(this::handle, name);running true;thread.start();}public void stop() {if(!running) {return;}running false;}
}在start方法中开启了一个线程在该线程中异步执行handle方法的具体任务。然后通过调用stop方法可以停止该线程。
其中使用volatile关键字控制的running变量作为开关它可以控制线程中的状态。
接下来有个比较关键的点是如何通过配置中心的配置控制这个开关呢
以apollo配置为例我们在配置中心的后台修改配置之后自动获取最新配置的核心代码如下
public class CanalConfig {Autowiredprivate CanalService canalService;ApolloConfigChangeListenerpublic void change(ConfigChangeEvent event) {String value event.getChange(test.canal.enable).getNewValue();if(BooleanUtils.toBoolean(value)) {canalService.start();} else {canalService.stop();}}
}通过apollo的ApolloConfigChangeListener注解可以监听配置参数的变化。
如果test.canal.enable开关配置的true则调用canalService类的start方法开启canal数据同步功能。如果开关配置的false则调用canalService类的stop方法自动停止canal数据同步功能。
3.收集日志
在某些高并发的场景中我们需要收集部分用户的日志比如用户登录的日志写到数据库中以便于做分析。
但由于项目中还没有引入消息中间件比如kafka、rocketmq等。
如果直接将日志同步写入数据库可能会影响接口性能。
所以大家很自然想到了异步处理。
实现这个需求最简单的做法是开启一个线程异步写入数据到数据库即可。
这样做可以是可以。
但如果用户登录操作的耗时比异步写入数据库的时间要少得多。这样导致的结果是生产日志的速度比消费日志的速度要快得多最终的性能瓶颈在消费端。
其实还有更优雅的处理方式虽说没有使用消息中间件但借用了它的思想。
这套记录登录日志的功能分为日志生产端、日志存储端和日志消费端。
如下图所示
先定义了一个阻塞队列。
Component
public class LoginLogQueue {private static final int QUEUE_MAX_SIZE 1000;private BlockingQueueblockingQueue queue new LinkedBlockingQueue(QUEUE_MAX_SIZE);//生成消息public boolean push(LoginLog loginLog) {return this.queue.add(loginLog);} //消费消息public LoginLog poll() {LoginLog loginLog null;try {loginLog this.queue.take();} catch (InterruptedException e) {e.printStackTrace();}return result;}
}然后定义了一个日志的生产者。
Service
public class LoginSerivce {Autowiredprivate LoginLogQueue loginLogQueue;public int login(UserInfo userInfo) {//业务处理LoginLog loginLog convert(userInfo);loginLogQueue.push(loginLog);}
}接下来定义了日志的消费者。
Service
public class LoginInfoConsumer {Autowiredprivate LoginLogQueue queue;PostConstructpublic voit init {new Thread(() - {while (true) {LoginLog loginLog queue.take();//写入数据库}}).start();}
}当然这个例子中使用单线程接收登录日志为了提升性能也可以使用线程池来处理业务逻辑比如写入数据库等。
4.excel导入
我们可能会经常收到运营同学提过来的excel数据导入需求比如将某一大类下的所有子类一次性导入系统或者导入一批新的供应商数据等等。
我们以导入供应商数据为例它所涉及的业务流程很长比如
调用天眼查接口校验企业名称和统一社会信用代码。写入供应商基本表写入组织表给供应商自动创建一个用户给该用户分配权限自定义域名发站内通知
等等。
如果在程序中解析完excel读取了所有数据之后。用单线程一条条处理业务逻辑可能耗时会非常长。
为了提升excel数据导入效率非常有必要使用多线程来处理。
当然在java中实现多线程的手段有很多种下面重点聊聊java8中最简单的实现方式parallelStream。
伪代码如下
supplierList.parallelStream().forEach(x - importSupplier(x));parallelStream是一个并行执行的流它默认通过ForkJoinPool实现的能提高你的多线程任务的速度。
ForkJoinPool处理的过程会分而治之它的核心思想是将一个大任务切分成多个小任务。每个小任务都能单独执行最后它会把所用任务的执行结果进行汇总。
下面用一张图简单介绍一下ForkJoinPool的原理
当然除了excel导入之外还有类似的读取文本文件也可以用类似的方法处理。 温馨的提醒一下如果一次性导入的数据非常多用多线程处理可能会使系统的cpu使用率飙升需要特别关注。 5.查询接口
很多时候我们需要在某个查询接口中调用其他服务的接口组合数据之后一起返回。
比如有这样的业务场景
在用户信息查询接口中需要返回用户名称、性别、等级、头像、积分、成长值等信息。
而用户名称、性别、等级、头像在用户服务中积分在积分服务中成长值在成长值服务中。为了汇总这些数据统一返回需要另外提供一个对外接口服务。
于是用户信息查询接口需要调用用户查询接口、积分查询接口 和 成长值查询接口然后汇总数据统一返回。
调用过程如下图所示
调用远程接口总耗时 530ms 200ms 150ms 180ms
显然这种串行调用远程接口性能是非常不好的调用远程接口总的耗时为所有的远程接口耗时之和。
那么如何优化远程接口性能呢
既然串行调用多个远程接口性能很差为什么不改成并行呢
如下图所示
调用远程接口总耗时 200ms 200ms即耗时最长的那次远程接口调用
在java8之前可以通过实现Callable接口获取线程返回结果。
java8以后通过CompleteFuture类实现该功能。我们这里以CompleteFuture为例
public UserInfo getUserInfo(Long id) throws InterruptedException, ExecutionException {final UserInfo userInfo new UserInfo();CompletableFuture userFuture CompletableFuture.supplyAsync(() - {getRemoteUserAndFill(id, userInfo);return Boolean.TRUE;}, executor);CompletableFuture bonusFuture CompletableFuture.supplyAsync(() - {getRemoteBonusAndFill(id, userInfo);return Boolean.TRUE;}, executor);CompletableFuture growthFuture CompletableFuture.supplyAsync(() - {getRemoteGrowthAndFill(id, userInfo);return Boolean.TRUE;}, executor);CompletableFuture.allOf(userFuture, bonusFuture, growthFuture).join();userFuture.get();bonusFuture.get();growthFuture.get();return userInfo;
}温馨提醒一下这两种方式别忘了使用线程池。示例中我用到了executor表示自定义的线程池为了防止高并发场景下出现线程过多的问题。
6.获取用户上下文
不知道你在项目开发时有没有遇到过这样的需求用户登录之后在所有的请求接口中通过某个公共方法就能获取到当前登录用户的信息
获取的用户上下文我们以CurrentUser为例。
CurrentUser内部包含了一个ThreadLocal对象它负责保存当前线程的用户上下文信息。当然为了保证在线程池中也能从用户上下文中获取到正确的用户信息这里用了阿里的TransmittableThreadLocal。伪代码如下
Data
public class CurrentUser {private static final TransmittableThreadLocalCurrentUser THREA_LOCAL new TransmittableThreadLocal();private String id;private String userName;private String password;private String phone;...public statis void set(CurrentUser user) {THREA_LOCAL.set(user);}public static void getCurrent() {return THREA_LOCAL.get();}
}这里为什么用了阿里的TransmittableThreadLocal而不是普通的ThreadLocal呢在线程池中由于线程会被多次复用导致从普通的ThreadLocal中无法获取正确的用户信息。父线程中的参数没法传递给子线程而TransmittableThreadLocal很好解决了这个问题。
然后在项目中定义一个全局的spring mvc拦截器专门设置用户上下文到ThreadLocal中。伪代码如下
public class UserInterceptor extends HandlerInterceptorAdapter {Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {CurrentUser user getUser(request);if(Objects.nonNull(user)) {CurrentUser.set(user);}}
}用户在请求我们接口时会先触发该拦截器它会根据用户cookie中的token调用调用接口获取redis中的用户信息。如果能获取到说明用户已经登录则把用户信息设置到CurrentUser类的ThreadLocal中。
接下来在api服务的下层即business层的方法中就能轻松通过CurrentUser.getCurrent();方法获取到想要的用户上下文信息了。
这套用户体系的想法是很good的但深入使用后发现了一个小插曲
api服务和mq消费者服务都引用了business层business层中的方法两个服务都能直接调用。
我们都知道在api服务中用户是需要登录的而mq消费者服务则不需要登录。
如果business中的某个方法刚开始是给api开发的在方法深处使用了CurrentUser.getCurrent();获取用户上下文。但后来某位新来的帅哥在mq消费者中也调用了那个方法并未发觉这个小机关就会中招出现找不到用户上下文的问题。
所以我当时的第一个想法是代码没做兼容处理因为之前这类问题偶尔会发生一次。
想要解决这个问题其实也很简单。只需先判断一下能否从CurrentUser中获取用户信息如果不能则取配置的系统用户信息。伪代码如下
Autowired
private BusinessConfig businessConfig;CurrentUser user CurrentUser.getCurrent();
if(Objects.nonNull(user)) {entity.setUserId(user.getUserId());entity.setUserName(user.getUserName());
} else {entity.setUserId(businessConfig.getDefaultUserId());entity.setUserName(businessConfig.getDefaultUserName());
}这种简单无公害的代码如果只是在一两个地方加还OK。
此外众所周知SimpleDateFormat在java8以前是用来处理时间的工具类它是非线程安全的。也就是说用该方法解析日期会有线程安全问题。
为了避免线程安全问题的出现我们可以把SimpleDateFormat对象定义成局部变量。但如果你一定要把它定义成静态变量可以使用ThreadLocal保存日期也能解决线程安全问题。
8. 传递参数
之前见过有些同事写代码时一个非常有趣的用法即使用MDC传递参数。
MDC是什么
MDC是org.slf4j包下的一个类它的全称是Mapped Diagnostic Context我们可以认为它是一个线程安全的存放诊断日志的容器。
MDC的底层是用了ThreadLocal来保存数据的。
例如现在有这样一种场景我们使用RestTemplate调用远程接口时有时需要在header中传递信息比如traceIdsource等便于在查询日志时能够串联一次完整的请求链路快速定位问题。
这种业务场景就能通过ClientHttpRequestInterceptor接口实现具体做法如下
第一步定义一个LogFilter拦截所有接口请求在MDC中设置traceId
public class LogFilter implements Filter {Overridepublic void init(FilterConfig filterConfig) throws ServletException {}Overridepublic void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {MdcUtil.add(UUID.randomUUID().toString());System.out.println(记录请求日志);chain.doFilter(request, response);System.out.println(记录响应日志);}Overridepublic void destroy() {}
}第二步实现ClientHttpRequestInterceptor接口MDC中获取当前请求的traceId然后设置到header中
public class RestTemplateInterceptor implements ClientHttpRequestInterceptor {Overridepublic ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException {request.getHeaders().set(traceId, MdcUtil.get());return execution.execute(request, body);}
}第三步定义配置类配置上面定义的RestTemplateInterceptor类
Configuration
public class RestTemplateConfiguration {Beanpublic RestTemplate restTemplate() {RestTemplate restTemplate new RestTemplate();restTemplate.setInterceptors(Collections.singletonList(restTemplateInterceptor()));return restTemplate;}Beanpublic RestTemplateInterceptor restTemplateInterceptor() {return new RestTemplateInterceptor();}
}其中MdcUtil其实是利用MDC工具在ThreadLocal中存储和获取traceId
public class MdcUtil {private static final String TRACE_ID TRACE_ID;public static String get() {return MDC.get(TRACE_ID);}public static void add(String value) {MDC.put(TRACE_ID, value);}
}当然这个例子中没有演示MdcUtil类的add方法具体调的地方我们可以在filter中执行接口方法之前生成traceId调用MdcUtil类的add方法添加到MDC中然后在同一个请求的其他地方就能通过MdcUtil类的get方法获取到该traceId。
能使用MDC保存traceId等参数的根本原因是用户请求到应用服务器Tomcat会从线程池中分配一个线程去处理该请求。
那么该请求的整个过程中保存到MDC的ThreadLocal中的参数也是该线程独享的所以不会有线程安全问题。
9. 模拟高并发
有时候我们写的接口在低并发的场景下一点问题都没有。
但如果一旦出现高并发调用该接口可能会出现一些意想不到的问题。
为了防止类似的事情发生一般在项目上线前我们非常有必要对接口做一下压力测试。
当然现在已经有比较成熟的压力测试工具比如Jmeter、LoadRunner等。
如果你觉得下载压测工具比较麻烦也可以手写一个简单的模拟并发操作的工具用CountDownLatch就能实现例如
public static void concurrenceTest() {/*** 模拟高并发情况代码*/final AtomicInteger atomicInteger new AtomicInteger(0);final CountDownLatch countDownLatch new CountDownLatch(1000); // 相当于计数器当所有都准备好了再一起执行模仿多并发保证并发量final CountDownLatch countDownLatch2 new CountDownLatch(1000); // 保证所有线程执行完了再打印atomicInteger的值ExecutorService executorService Executors.newFixedThreadPool(10);try {for (int i 0; i 1000; i) {executorService.submit(new Runnable() {Overridepublic void run() {try {countDownLatch.await(); //一直阻塞当前线程直到计时器的值为0,保证同时并发} catch (InterruptedException e) {log.error(e.getMessage(),e);}//每个线程增加1000次每次加1for (int j 0; j 1000; j) {atomicInteger.incrementAndGet();}countDownLatch2.countDown();}});countDownLatch.countDown();}countDownLatch2.await();// 保证所有线程执行完executorService.shutdown();} catch (Exception e){log.error(e.getMessage(),e);}
}10. 处理mq消息
在高并发的场景中消息积压问题可以说如影随形真的没办法从根本上解决。表面上看已经解决了但后面不知道什么时候就会冒出一次比如这次
有天下午产品过来说有几个商户投诉过来了他们说菜品有延迟快查一下原因。
这次问题出现得有点奇怪。
为什么这么说
首先这个时间点就有点奇怪平常出问题不都是中午或者晚上用餐高峰期吗怎么这次问题出现在下午
根据以往积累的经验我直接看了kafka的topic的数据果然上面消息有积压但这次每个partition都积压了十几万的消息没有消费比以往加压的消息数量增加了几百倍。这次消息积压得极不寻常。
我赶紧查服务监控看看消费者挂了没还好没挂。又查服务日志没有发现异常。这时我有点迷茫碰运气问了问订单组下午发生了什么事情没他们说下午有个促销活动跑了一个JOB批量更新过有些商户的订单信息。
这时我一下子如梦初醒是他们在JOB中批量发消息导致的问题。怎么没有通知我们呢实在太坑了。
虽说知道问题的原因了倒是眼前积压的这十几万的消息该如何处理呢
此时如果直接调大partition数量是不行的历史消息已经存储到4个固定的partition只有新增的消息才会到新的partition。我们重点需要处理的是已有的partition。
直接加服务节点也不行因为kafka允许同组的多个partition被一个consumer消费但不允许一个partition被同组的多个consumer消费可能会造成资源浪费。
看来只有用多线程处理了。
为了紧急解决问题我改成了用线程池处理消息核心线程和最大线程数都配置成了50。
大致用法如下
先定义一个线程池
Configuration
public class ThreadPoolConfig {Value(${thread.pool.corePoolSize:5})private int corePoolSize;Value(${thread.pool.maxPoolSize:10})private int maxPoolSize;Value(${thread.pool.queueCapacity:200})private int queueCapacity;Value(${thread.pool.keepAliveSeconds:30})private int keepAliveSeconds;Value(${thread.pool.threadNamePrefix:ASYNC_})private String threadNamePrefix;Bean(messageExecutor)public Executor messageExecutor() {ThreadPoolTaskExecutor executor new ThreadPoolTaskExecutor();executor.setCorePoolSize(corePoolSize);executor.setMaxPoolSize(maxPoolSize);executor.setQueueCapacity(queueCapacity);executor.setKeepAliveSeconds(keepAliveSeconds);executor.setThreadNamePrefix(threadNamePrefix);executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();return executor;}
}再定义一个消息的consumer
Service
public class MyConsumerService {Autowiredprivate Executor messageExecutor;KafkaListener(idtest,topics{topic-test})public void listen(String message){System.out.println(收到消息 message);messageExecutor.submit(new MyWork(message);}
}在定义的Runable实现类中处理业务逻辑
public class MyWork implements Runnable {private String message;public MyWork(String message) {this.message message;}Overridepublic void run() {System.out.println(message);}
}果然调整之后消息积压数量确实下降的非常快大约半小时后积压的消息就非常顺利的处理完了。
但此时有个更严重的问题出现我收到了报警邮件有两个订单系统的节点down机了。。。
11. 统计数量
在多线程的场景中有时候需要统计数量比如用多线程导入供应商数据时统计导入成功的供应商数有多少。
如果这时候用count统计次数最终的结果可能会不准。因为count并非原子操作如果多个线程同时执行该操作则统计的次数可能会出现异常。
为了解决这个问题就需要使用concurent的atomic包下面的类比如AtomicInteger、AtomicLong等。
Servcie
public class ImportSupplierService {private static AtomicInteger count new AtomicInteger(0);public int importSupplier(ListSupplierInfo supplierList) {if(CollectionUtils.isEmpty(supplierList)) {return 0;}supplierList.parallelStream().forEach(x - {try {importSupplier(x);count.addAndGet(1);} catch(Exception e) {log.error(e.getMessage(),e);});return count.get();}
}AtomicInteger的底层说白了使用自旋锁CAS。
public final int incrementAndGet() {for (;;) {int current get();int next current 1;if (compareAndSet(current, next))return next;}
}自旋锁说白了就是一个死循环。
而CAS是比较和交换的意思。
它的实现逻辑是将内存位置处的旧值与预期值进行比较若相等则将内存位置处的值替换为新值。若不相等则不做任何操作。
12. 延迟定时任务
我们经常有延迟处理数据的需求比如如果用户下单后超过30分钟还未完成支付则系统自动将该订单取消。
这里需求就可以使用延迟定时任务实现。
ScheduledExecutorService是JDK1.5版本引进的定时任务该类位于java.util.concurrent并发包下。
ScheduledExecutorService是基于多线程的设计的初衷是为了解决Timer单线程执行多个任务之间会互相影响的问题。
它主要包含4个方法
schedule(Runnable command,long delay,TimeUnit unit)带延迟时间的调度只执行一次调度之后可通过Future.get()阻塞直至任务执行完毕。schedule(Callablecallable,long delay,TimeUnit unit)带延迟时间的调度只执行一次调度之后可通过Future.get()阻塞直至任务执行完毕并且可以获取执行结果。scheduleAtFixedRate表示以固定频率执行的任务如果当前任务耗时较多超过定时周期period则当前任务结束后会立即执行。scheduleWithFixedDelay表示以固定延时执行任务延时是相对当前任务结束为起点计算开始时间。
实现这种定时任务的具体代码如下
public class ScheduleExecutorTest {public static void main(String[] args) {ScheduledExecutorService scheduledExecutorService Executors.newScheduledThreadPool(5);scheduledExecutorService.scheduleAtFixedRate(() - {System.out.println(doSomething);},1000,1000, TimeUnit.MILLISECONDS);}
}调用ScheduledExecutorService类的scheduleAtFixedRate方法实现周期性任务每隔1秒钟执行一次每次延迟1秒再执行。
这种定时任务是阿里巴巴开发者规范中用来替代Timer类的方案对于多线程执行周期性任务是个不错的选择。
使用ScheduledExecutorService类做延迟定时任务的优缺点
优点基于多线程的定时任务多个任务之间不会相关影响支持周期性的执行任务并且带延迟功能。缺点不支持一些较复杂的定时规则。
当然你也可以使用分布式定时任务比如xxl-job或者elastic-job等等。
其实在实际工作中我使用多线程的场景远远不只这12种在这里只是抛砖引玉介绍了一些我认为比较常见的业务场景。