Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the remote partition Step based on RabbitMQ in spring batch

2025-04-06 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

What is the remote partition Step based on RabbitMQ in spring batch? I believe many inexperienced people are at a loss about it. Therefore, this paper summarizes the causes and solutions of the problem. Through this article, I hope you can solve this problem.

Antecedent words

The instance built by editor can be used as master service, slave service and master-slave mixed mode, which can greatly improve the effectiveness of spring batch in stand-alone processing.

Project source code: https://gitee.com/kailing/partitionjob

Principle of spring batch remote Partition Step

The master node divides the data into segments of data to be processed according to the relevant logic (ID,hash), and then puts the data set into the message middleware (ActiveMQ,RabbitMQ), listens to the message from the node, gets the message, reads the data set in the message and sends back the result. As shown below:

The following is implemented step by step according to the principle to complete the remote partition instance of spring batch

The first step is to introduce related dependencies.

See: https://gitee.com/kailing/partitionjob/blob/master/pom.xml

Partition job mainly depends on: spring-batch-integration, which provides the ability to communicate remotely.

The second step is Master node data distribution.

@ Profile ({"master", "mixed"}) @ Bean public Job job (@ Qualifier ("masterStep") Step masterStep) {return jobBuilderFactory.get ("endOfDayjob") .start (masterStep) .incrementer (new BatchIncrementer ()) .listener (new JobListener ()) .build () } @ Bean ("masterStep") public Step masterStep (@ Qualifier ("slaveStep") Step slaveStep, PartitionHandler partitionHandler, DataSource dataSource) {return stepBuilderFactory.get ("masterStep") .partitioning (slaveStep.getName () New ColumnRangePartitioner (dataSource)) .step (slaveStep) .partitionHandler (partitionHandler) .build () }

The key part of the master node is that his Step needs to set up the Name of the slave node Step and a data divider, and the data divider needs to implement the Partitioner interface, which returns a Map data structure, which fully describes the partition fragments that each slave node needs to process. ExecutionContext holds the data boundaries to be processed by the slave node. Of course, the parameters in ExecutionContext are based on your business. Here, the data ID has divided each zone for the boundary. The specific Partitioner implementation is as follows:

/ * Created by kl on 2018-3-1. * Content: ID fragments according to data * / public class ColumnRangePartitioner implements Partitioner {private JdbcOperations jdbcTemplate; ColumnRangePartitioner (DataSource dataSource) {this.jdbcTemplate = new JdbcTemplate (dataSource);} @ Override public Map partition (int gridSize) {int min = jdbcTemplate.queryForObject ("SELECT MIN (arcid) from kl_article", Integer.class) Int max = jdbcTemplate.queryForObject ("SELECT MAX (arcid) from kl_article", Integer.class); int targetSize = (max-min) / gridSize + 1; Map result = new HashMap (); int number = 0; int start = min; int end = start + targetSize-1; while (start = max) {end = max } value.putInt ("minValue", start); value.putInt ("maxValue", end); start + = targetSize; end + = targetSize; number++;} return result;}}

Step 3, Integration configuration

Spring batch Integration provides remote partition communication capability, Spring Integration has rich channel adapters (such as JMS and AMQP), and can realize remote partition processing based on middleware such as ActiveMQ,RabbitMQ. This paper uses RabbitMQ as the communication middleware. The installation of RabbitMQ is beyond the scope of this article, the following code describes how to configure MQ connections, as well as spring batch partition related queues, message adapters, and so on.

/ * Created by kl on 2018-3-1. * Content: remote partition newsletter * / @ Configuration@ConfigurationProperties (prefix = "spring.rabbit") public class IntegrationConfiguration {private String host; private Integer port=5672; private String username; private String password; private String virtualHost; private int connRecvThreads=5; private int channelCacheSize=10; @ Bean public ConnectionFactory connectionFactory () {CachingConnectionFactory connectionFactory = new CachingConnectionFactory (host, port); connectionFactory.setUsername (username) ConnectionFactory.setPassword (password); connectionFactory.setVirtualHost (virtualHost); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor (); executor.setCorePoolSize (connRecvThreads); executor.initialize (); connectionFactory.setExecutor (executor); connectionFactory.setPublisherConfirms (true); connectionFactory.setChannelCacheSize (channelCacheSize); return connectionFactory;} @ Bean public MessagingTemplate messageTemplate () {MessagingTemplate messagingTemplate = new MessagingTemplate (outboundRequests ()) MessagingTemplate.setReceiveTimeout (6000000l); return messagingTemplate;} @ Bean public DirectChannel outboundRequests () {return new DirectChannel ();} @ Bean @ ServiceActivator (inputChannel = "outboundRequests") public AmqpOutboundEndpoint amqpOutboundEndpoint (AmqpTemplate template) {AmqpOutboundEndpoint endpoint = new AmqpOutboundEndpoint (template); endpoint.setExpectReply (true); endpoint.setOutputChannel (inboundRequests ()); endpoint.setRoutingKey ("partition.requests"); return endpoint } @ Bean public Queue requestQueue () {return new Queue ("partition.requests", false);} @ Bean @ Profile ({"slave", "mixed"}) public AmqpInboundChannelAdapter inbound (SimpleMessageListenerContainer listenerContainer) {AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter (listenerContainer); adapter.setOutputChannel (inboundRequests ()); adapter.afterPropertiesSet (); return adapter @ Bean public SimpleMessageListenerContainer container (ConnectionFactory connectionFactory) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer (connectionFactory); container.setQueueNames ("partition.requests"); container.setAutoStartup (false); return container;} @ Bean public PollableChannel outboundStaging () {return new NullChannel ();} @ Bean public QueueChannel inboundRequests () {return new QueueChannel ();}

The fourth step is to receive partition information from the node and process it.

Bean @ Profile ({"slave", "mixed"}) @ ServiceActivator (inputChannel = "inboundRequests", outputChannel = "outboundStaging") public StepExecutionRequestHandler stepExecutionRequestHandler () {StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler (); BeanFactoryStepLocator stepLocator = new BeanFactoryStepLocator (); stepLocator.setBeanFactory (this.applicationContext); stepExecutionRequestHandler.setStepLocator (stepLocator); stepExecutionRequestHandler.setJobExplorer (this.jobExplorer); return stepExecutionRequestHandler } @ Bean ("slaveStep") public Step slaveStep (MyProcessorItem processorItem, JpaPagingItemReader reader) {CompositeItemProcessor itemProcessor = new CompositeItemProcessor (); List processorList = new ArrayList (); processorList.add (processorItem); itemProcessor.setDelegates (processorList) Return stepBuilderFactory.get ("slaveStep") .chunk (1000) / / transaction commit batch .reader (reader) .processor (itemProcessor) .writer (new PrintWriterItem ()) .build ();}

The most important part of the slave node is StepExecutionRequestHandler, which receives messages from MQ message middleware and obtains the data boundaries to be processed from the partition information, as shown in ItemReader:

@ Bean (destroyMethod = ") @ StepScope public JpaPagingItemReader jpaPagingItemReader (@ Value (" # {stepExecutionContext ['minValue']} ") Long minValue, @ Value (" # {stepExecutionContext [' maxValue']} ") Long maxValue) {System.err.println (" received sharding parameter ["+ minValue+"-> "+ maxValue+"] "); JpaPagingItemReader reader = new JpaPagingItemReader (); JpaNativeQueryProvider queryProvider = new JpaNativeQueryProvider () String sql = "select * from kl_article where arcid > =: minValue and arcid

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report