资讯详情

【spring】spring异步执行的使用与源码分析

在实际开发过程中,一些业务逻辑通过异步处理更为合理。例如,在业务逻辑中,需要存储一些数据redis在缓存中,此操作只是一个辅助功能,成功或失败不会对主营业务产生根本影响,可以通过异步进行。

Spring在方法上设置@Async注意,可以使方法异步调用。也就是说,该方法在调用时会立即返回,并将该方法的实际执行交给它Spring的TaskExecutor去完成。

使用异步执行

配置类

使用@EnableAsync注解异步功能。

package com.morris.spring.config;  import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;  import java.util.concurrent.Executor;  @Configuration @EnableAsync // 开启Async public class AsyncConfig implements AsyncConfigurer { 
           @Override  public Executor getAsyncExecutor() { 
           // 定制线程池   ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();   executor.setCorePoolSize(2);   executor.setMaxPoolSize(4);   executor.setQueueCapacity(10);   executor.setThreadNamePrefix("MyExecutor-");   exector.initialize();
		return executor;
	}

}

service层的使用

在需要异步执行的方法上面加上@Async注解。

package com.morris.spring.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

@Slf4j
public class AsyncService { 
        

	@Async
	public void noResult() { 
        
		log.info("execute noResult");
	}

	@Async
	public Future<String> hasResult() throws InterruptedException { 
        
		log.info("execute hasResult");
		TimeUnit.SECONDS.sleep(5);
		return new AsyncResult<>("hasResult success");
	}

	@Async
	public CompletableFuture<String> completableFuture() throws InterruptedException { 
        
		log.info(" execute completableFuture");
		TimeUnit.SECONDS.sleep(5);
		return CompletableFuture.completedFuture("completableFuture success");
	}

}

测试类

package com.morris.spring.demo.async;

import com.morris.spring.config.AsyncConfig;
import com.morris.spring.service.AsyncService;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/** * 异步调用的演示 */
@Slf4j
public class AsyncDemo { 
        
	@Test
	public void test() throws ExecutionException, InterruptedException { 
        
		AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext();
		applicationContext.register(AsyncService.class);
		applicationContext.register(AsyncConfig.class);
		applicationContext.refresh();

		AsyncService asyncService = applicationContext.getBean(AsyncService.class);
		asyncService.noResult(); // 无结果

		Future<String> future = asyncService.hasResult();
		log.info("hasResult: {}", future.get()); // 有结果

		CompletableFuture<String> completableFuture = asyncService.completableFuture();
		completableFuture.thenAcceptAsync(System.out::println);// 异步回调
		log.info("completableFuture call down");
	}
}

运行结果如下:

INFO  MyExecutor-1 AsyncService:16 - execute noResult
INFO  MyExecutor-2 AsyncService:21 - execute hasResult
INFO  main AsyncDemo:29 - hasResult: hasResult success
INFO  MyExecutor-1 AsyncService:28 -  execute completableFuture
INFO  main AsyncDemo:33 - completableFuture call down

通过日志可以发现AsyncService的方法都是通过线程名为MyExecutor-1的线程执行的,这个名称的前缀是在AsyncConfig中指定的,而不是通过main线程执行的。

两个疑问:

源码分析

@EnableAsync

@EnableAsync主要是向Spring容器中导入了AsyncConfigurationSelector类。

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync { 
        

AsyncConfigurationSelector

AsyncConfigurationSelector的主要方法当然是selectImports(),注意这里会先调用父类的selectImports() org.springframework.context.annotation.AdviceModeImportSelector#selectImports(org.springframework.core.type.AnnotationMetadata)

public final String[] selectImports(AnnotationMetadata importingClassMetadata) { 
        
	Class<?> annType = GenericTypeResolver.resolveTypeArgument(getClass(), AdviceModeImportSelector.class);
	Assert.state(annType != null, "Unresolvable type argument for AdviceModeImportSelector");

	AnnotationAttributes attributes = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);
	if (attributes == null) { 
        
		throw new IllegalArgumentException(String.format(
				"@%s is not present on importing class '%s' as expected",
				annType.getSimpleName(), importingClassMetadata.getClassName()));
	}

	AdviceMode adviceMode = attributes.getEnum(getAdviceModeAttributeName());
	// 模板方法模式,回调子类的selectImports
	String[] imports = selectImports(adviceMode);
	if (imports == null) { 
        
		throw new IllegalArgumentException("Unknown AdviceMode: " + adviceMode);
	}
	return imports;
}

org.springframework.scheduling.annotation.AsyncConfigurationSelector#selectImports

public String[] selectImports(AdviceMode adviceMode) { 
        
	switch (adviceMode) { 
        
		case PROXY:
			// 奇怪???@Transaction、@EnableCaching都是注入两个类,一个config,一个registrar导入aop的入口类
			// 而这里只有一个config类ProxyAsyncConfiguration
			return new String[] { 
        ProxyAsyncConfiguration.class.getName()};
		case ASPECTJ:
			return new String[] { 
        ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
		default:
			return null;
	}
}

AsyncConfigurationSelector又导入了配置类ProxyAsyncConfiguration。

ProxyAsyncConfiguration

@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration { 
        

	/** * 先看父类AbstractAsyncConfiguration * @return */
	@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public AsyncAnnotationBeanPostProcessor asyncAdvisor() { 
        
		Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
		// 实例化AsyncAnnotationBeanPostProcessor
		AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
		bpp.configure(this.executor, this.exceptionHandler);
		Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
		if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) { 
        
			bpp.setAsyncAnnotationType(customAsyncAnnotation);
		}
		bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
		bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
		return bpp;
	}

}

ProxyAsyncConfiguration向容器中注入了一个AsyncAnnotationBeanPostProcessor。

疑问:这里为啥是BeanPostProcessor,不应该像事务切面或者缓存切面一样,注入一个Advisor和XxxxInterceptor(Advice)吗?

AbstractAsyncConfiguration

AbstractAsyncConfiguration是ProxyAsyncConfiguration的父类。

@Configuration
public abstract class AbstractAsyncConfiguration implements ImportAware { 
        

	@Nullable
	protected AnnotationAttributes enableAsync;

	@Nullable
	protected Supplier<Executor> executor;

	@Nullable
	protected Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;

	/** * 实现了ImportAware.setImportMetadata * 在ProxyAsyncConfiguration初始化后被调用 * @param importMetadata */
	@Override
	public void setImportMetadata(AnnotationMetadata importMetadata) { 
        
		// 取得@EnableAsync注解
		this.enableAsync = AnnotationAttributes.fromMap(
				importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));
		if (this.enableAsync == null) { 
        
			throw new IllegalArgumentException(
					"@EnableAsync is not present on importing class " + importMetadata.getClassName());
		}
	}

	/** * Collect any {@link AsyncConfigurer} beans through autowiring. */
	@Autowired(required = false)
	void setConfigurers(Collection<AsyncConfigurer> configurers) { 
        
		// configurers默认为空,除非手动注入AsyncConfigurer
		if (CollectionUtils.isEmpty(configurers)) { 
        
			return;
		}
		if (configurers.size() > 1) { 
        
			throw new IllegalStateException("Only one AsyncConfigurer may exist");
		}
		AsyncConfigurer configurer = configurers.iterator().next();
		this.executor = configurer::getAsyncExecutor;
		this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;
	}

}

从这里可以看出,可以通过向spring容器中注入AsyncConfigurer来指定执行异步任务的线程池和异常处理器。

AsyncAnnotationBeanPostProcessor

AsyncAnnotationBeanPostProcessor的继承结构图: 20220424174516480.png AsyncAnnotationBeanPostProcessor主要实现了BeanFactoryAware和BeanPostProcessor接口。

org.springframework.scheduling.annotation.AsyncAnnotationBeanPostProcessor#setBeanFactory

public void setBeanFactory(BeanFactory beanFactory) { 
        
	super.setBeanFactory(beanFactory);

	// 实例化Advisor
	AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
	if (this.asyncAnnotationType != null) { 
        
		advisor.setAsyncAnnotationType(this.asyncAnnotationType);
	}
	advisor.setBeanFactory(beanFactory);
	this.advisor = advisor;
}

在AsyncAnnotationBeanPostProcessor实例化时实例化了切面AsyncAnnotationAdvisor。

每个bean实例化完后都会调用AsyncAnnotationBeanPostProcessor.postProcessAfterInitialization()判断是否要生成代理对象。

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) { 
        
	... ...
	/** * @see AbstractBeanFactoryAwareAdvisingPostProcessor#isEligible(java.lang.Object, java.lang.String) */
	// isEligible会判断哪些bean要生成代理
	// 就是使用advisor中的pointcut进行匹配
	if (isEligible(bean, beanName)) { 
        
		// 创建代理
		ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
		if (!proxyFactory.isProxyTargetClass()) { 
        
			evaluateProxyInterfaces(bean.getClass(), proxyFactory);
		}
		proxyFactory.addAdvisor(this.advisor);
		customizeProxyFactory(proxyFactory);
		return proxyFactory.getProxy(getProxyClassLoader());
	}
	// No proxy needed.
	return bean;
}

AsyncAnnotationAdvisor

切面AsyncAnnotationAdvisor包括通知AnnotationAsyncExecutionInterceptor和切点ComposablePointcut。

public AsyncAnnotationAdvisor(
		@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { 
        

	Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
	asyncAnnotationTypes.add(Async.class);
	try { 
        
		asyncAnnotationTypes.add((Class<? extends Annotation>)
				ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
	}
	catch (ClassNotFoundException ex) { 
        
		// If EJB 3.1 API not present, simply ignore.
	}
	this.advice = buildAdvice(executor, exceptionHandler); // 创建AnnotationAsyncExecutionInterceptor
	this.pointcut = buildPointcut(asyncAnnotationTypes); // 创建ComposablePointcut
}

protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { 
        
	AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
	interceptor.configure(executor, exceptionHandler);
	return interceptor;
}

protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) { 
        
	ComposablePointcut result = null;
	for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) { 
        
		Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true); // 类
		Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true); // 方法
		if (result == null) { 
        
			result = new ComposablePointcut(cpc);
		}
		else { 
        
			result.union(cpc); // 类和方法的组合切点
		}
		result = result.union(mpc);
	}
	return (result != null ? result : Pointcut.TRUE);
}

AnnotationMatchingPointcut切面其实就是查看类或者方法上面有没有@Async注解。

AnnotationAsyncExecutionInterceptor

AnnotationAsyncExecutionInterceptor类主要负责增强逻辑的实现。

org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke

public Object invoke(final MethodInvocation invocation) throws Throwable { 
        
	Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : 

标签: 对射光电开关传感器感应开关excpc电感式传感器发射线圈

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台