spring batch中基于RabbitMQ远程分区Step是怎样的,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
十年的乐昌网站建设经验,针对设计、前端、开发、售后、文案、推广等六对一服务,响应快,48小时及时工作处理。营销型网站的优势是能够根据用户设备显示端的尺寸不同,自动调整乐昌建站的显示方式,使网站能够适用不同显示终端,在浏览器中调整网站的宽度,无论在任何一种浏览器上浏览网站,都能展现优雅布局与设计,从而大程度地提升浏览体验。成都创新互联公司从事“乐昌网站设计”,“乐昌网站推广”以来,每个客户项目都认真落实执行。
前言碎语
小编构建的实例可为主服务,从服务,主从混用等模式,可以大大提高spring batch在单机处理时的时效。
项目源码:https://gitee.com/kailing/partitionjob
spring batch远程分区Step的原理
master节点将数据根据相关逻辑(ID,hash),拆分成一段一段要处理的数据集,然后将数据集放到消息中间件中(ActiveMQ,RabbitMQ ),从节点监听到消息,获取消息,读取消息中的数据集处理并发回结果。如下图:

下面按原理分步骤实施,完成springbatch的远程分区实例
第一步,首先引入相关依赖
见:https://gitee.com/kailing/partitionjob/blob/master/pom.xml
分区job主要依赖为:spring-batch-integration,提供了远程通讯的能力
第二步,Master节点数据分发
@Profile({"master", "mixed"})
@Bean
public Job job(@Qualifier("masterStep") Step masterStep) {
return jobBuilderFactory.get("endOfDayjob")
.start(masterStep)
.incrementer(new BatchIncrementer())
.listener(new JobListener())
.build();
}
@Bean("masterStep")
public Step masterStep(@Qualifier("slaveStep") Step slaveStep,
PartitionHandler partitionHandler,
DataSource dataSource) {
return stepBuilderFactory.get("masterStep")
.partitioner(slaveStep.getName(), new ColumnRangePartitioner(dataSource))
.step(slaveStep)
.partitionHandler(partitionHandler)
.build();
}master节点关键部分是,他的Step需要设置从节点Step的Name,和一个数据分区器,数据分区器需要实现Partitioner接口,它返回一个Map
/**
* Created by kl on 2018/3/1.
* Content :根据数据ID分片
*/
public class ColumnRangePartitioner implements Partitioner {
private JdbcOperations jdbcTemplate;
ColumnRangePartitioner(DataSource dataSource){
this.jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
public Map partition(int gridSize) {
int min = jdbcTemplate.queryForObject("SELECT MIN(arcid) from kl_article", Integer.class);
int max = jdbcTemplate.queryForObject("SELECT MAX(arcid) from kl_article", Integer.class);
int targetSize = (max - min) / gridSize + 1;
Map result = new HashMap();
int number = 0;
int start = min;
int end = start + targetSize - 1;
while (start <= max) {
ExecutionContext value = new ExecutionContext();
result.put("partition" + number, value);
if (end >= max) {
end = max;
}
value.putInt("minValue", start);
value.putInt("maxValue", end);
start += targetSize;
end += targetSize;
number++;
}
return result;
}
} 第三步,Integration配置
spring batch Integration提供了远程分区通讯能力,Spring Integration拥有丰富的通道适配器(例如JMS和AMQP),基于ActiveMQ,RabbitMQ等中间件都可以实现远程分区处理。本文使用RabbitMQ来做为通讯的中间件。关于RabbitMQ的安装等不在本篇范围,下面代码描述了如何配置MQ连接,以及spring batch分区相关队列,消息适配器等。
/**
* Created by kl on 2018/3/1.
* Content :远程分区通讯
*/
@Configuration
@ConfigurationProperties(prefix = "spring.rabbit")
public class IntegrationConfiguration {
private String host;
private Integer port=5672;
private String username;
private String password;
private String virtualHost;
private int connRecvThreads=5;
private int channelCacheSize=10;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(connRecvThreads);
executor.initialize();
connectionFactory.setExecutor(executor);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setChannelCacheSize(channelCacheSize);
return connectionFactory;
}
@Bean
public MessagingTemplate messageTemplate() {
MessagingTemplate messagingTemplate = new MessagingTemplate(outboundRequests());
messagingTemplate.setReceiveTimeout(60000000l);
return messagingTemplate;
}
@Bean
public DirectChannel outboundRequests() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "outboundRequests")
public AmqpOutboundEndpoint amqpOutboundEndpoint(AmqpTemplate template) {
AmqpOutboundEndpoint endpoint = new AmqpOutboundEndpoint(template);
endpoint.setExpectReply(true);
endpoint.setOutputChannel(inboundRequests());
endpoint.setRoutingKey("partition.requests");
return endpoint;
}
@Bean
public Queue requestQueue() {
return new Queue("partition.requests", false);
}
@Bean
@Profile({"slave","mixed"})
public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer) {
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
adapter.setOutputChannel(inboundRequests());
adapter.afterPropertiesSet();
return adapter;
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames("partition.requests");
container.setAutoStartup(false);
return container;
}
@Bean
public PollableChannel outboundStaging() {
return new NullChannel();
}
@Bean
public QueueChannel inboundRequests() {
return new QueueChannel();
}第四步,从节点接收分区信息并处理
@Bean
@Profile({"slave","mixed"})
@ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")
public StepExecutionRequestHandler stepExecutionRequestHandler() {
StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
BeanFactoryStepLocator stepLocator = new BeanFactoryStepLocator();
stepLocator.setBeanFactory(this.applicationContext);
stepExecutionRequestHandler.setStepLocator(stepLocator);
stepExecutionRequestHandler.setJobExplorer(this.jobExplorer);
return stepExecutionRequestHandler;
}
@Bean("slaveStep")
public Step slaveStep(MyProcessorItem processorItem,
JpaPagingItemReader reader) {
CompositeItemProcessor itemProcessor = new CompositeItemProcessor();
List processorList = new ArrayList<>();
processorList.add(processorItem);
itemProcessor.setDelegates(processorList);
return stepBuilderFactory.get("slaveStep")
.chunk(1000)//事务提交批次
.reader(reader)
.processor(itemProcessor)
.writer(new PrintWriterItem())
.build();
} 从节点最关键的地方在于StepExecutionRequestHandler,他会接收MQ消息中间件中的消息,并从分区信息中获取到需要处理的数据边界,如下ItemReader:
@Bean(destroyMethod = "") @StepScope public JpaPagingItemReaderjpaPagingItemReader( @Value("#{stepExecutionContext['minValue']}") Long minValue, @Value("#{stepExecutionContext['maxValue']}") Long maxValue) { System.err.println("接收到分片参数["+minValue+"->"+maxValue+"]"); JpaPagingItemReader reader = new JpaPagingItemReader<>(); JpaNativeQueryProvider queryProvider = new JpaNativeQueryProvider<>(); String sql = "select * from kl_article where arcid >= :minValue and arcid <= :maxValue"; queryProvider.setSqlQuery(sql); queryProvider.setEntityClass(Article.class); reader.setQueryProvider(queryProvider); Map queryParames= new HashMap(); queryParames.put("minValue",minValue); queryParames.put("maxValue",maxValue); reader.setParameterValues(queryParames); reader.setEntityManagerFactory(entityManagerFactory); return reader; }
中的minValuemin,maxValue,正是前文中Master节点分区中设置的值
如上,已经完成了整个spring batch 远程分区处理的实例,需要注意的是,一个实例,即可主可从可主从,是有spring profile来控制的,细心的人可能会发现@Profile({"master", "mixed"})等注解,所以如果你在测试的时候,别忘了在spring boot中配置好spring.profiles.active=slave等。
看完上述内容,你们掌握spring batch中基于RabbitMQ远程分区Step是怎样的的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注创新互联行业资讯频道,感谢各位的阅读!
网页标题:springbatch中基于RabbitMQ远程分区Step是怎样的
当前网址:http://www.jxjierui.cn/article/gspcpc.html


咨询
建站咨询
