Spring批处理示例
欢迎使用Spring Batch示例。
Spring Batch是用于执行批处理作业的Spring框架模块。
我们可以使用spring batch来处理一系列作业。
Spring批处理示例
在进行Spring批处理示例程序之前,让我们对Spring批处理术语有所了解。
- 一项工作可以包含" n"个步骤。 
 每个步骤都包含Read-Process-Write任务,或者它可以具有单个操作,称为tasklet。
- 基本上从诸如数据库,CSV等之类的源读取Read-Process-Write,然后处理数据并将其写入诸如数据库,CSV,XML等之类的源。 
- Tasklet意味着执行单个任务或者操作,例如清理连接,在处理完成后释放资源。 
- 可以将Read-Process-Write和Tasklet链接在一起以运行作业。 
Spring批处理示例
让我们考虑一个实现Spring Batch的可行示例。
为了实现目的,我们将考虑以下情形。
包含数据的CSV文件需要与数据一起转换为XML,并且标记将以列名命名。
以下是用于Spring批处理示例的重要工具和库。
- Apache Maven 3.5.0 –用于项目构建和依赖项管理。 
- Eclipse Oxygen版本4.7.0 –用于创建spring batch maven应用程序的IDE。 
- Java 1.8 
- Spring Core 4.3.12。RELEASE 
- Spring OXM 4.3.12。RELEASE 
- Spring JDBC 4.3.12。RELEASE 
- Spring Batch 3.0.8。RELEASE 
- MySQL Java驱动程序5.1.25 –根据您的MySQL安装使用。这是Spring Batch元数据表所必需的。 
Spring Batch示例目录结构
下图说明了我们的Spring Batch示例项目中的所有组件。
Spring Batch Maven依赖关系
以下是pom.xml文件的内容,其中包含我们的Spring Batch示例项目所需的所有必需依赖项。
<project xmlns="https://maven.apache.org/POM/4.0.0" xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="https://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.theitroad.spring</groupId>
	<artifactId>SpringBatchExample</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>
	<name>SpringBatchDemo</name>
	<url>https://maven.apache.org</url>
	<properties>
		<jdk.version>1.8</jdk.version>
		<spring.version>4.3.12.RELEASE</spring.version>
		<spring.batch.version>3.0.8.RELEASE</spring.batch.version>
		<mysql.driver.version>5.1.25</mysql.driver.version>
		<junit.version>4.11</junit.version>
	</properties>
	<dependencies>
		<!-- Spring Core -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-core</artifactId>
			<version>${spring.version}</version>
		</dependency>
		<!-- Spring jdbc, for database -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jdbc</artifactId>
			<version>${spring.version}</version>
		</dependency>
		<!-- Spring XML to/back object -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-oxm</artifactId>
			<version>${spring.version}</version>
		</dependency>
		<!-- MySQL database driver -->
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>${mysql.driver.version}</version>
		</dependency>
		<!-- Spring Batch dependencies -->
		<dependency>
			<groupId>org.springframework.batch</groupId>
			<artifactId>spring-batch-core</artifactId>
			<version>${spring.batch.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.batch</groupId>
			<artifactId>spring-batch-infrastructure</artifactId>
			<version>${spring.batch.version}</version>
		</dependency>
		<!-- Spring Batch unit test -->
		<dependency>
			<groupId>org.springframework.batch</groupId>
			<artifactId>spring-batch-test</artifactId>
			<version>${spring.batch.version}</version>
		</dependency>
		<!-- Junit -->
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>${junit.version}</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>com.thoughtworks.xstream</groupId>
			<artifactId>xstream</artifactId>
			<version>1.4.10</version>
		</dependency>
	</dependencies>
	<build>
		<finalName>spring-batch</finalName>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-eclipse-plugin</artifactId>
				<version>2.9</version>
				<configuration>
					<downloadSources>true</downloadSources>
					<downloadJavadocs>false</downloadJavadocs>
				</configuration>
			</plugin>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>2.3.2</version>
				<configuration>
					<source>${jdk.version}</source>
					<target>${jdk.version}</target>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>
Spring Batch处理CSV输入文件
这是用于Spring批处理的示例CSV文件的内容。
1001,Tom,Moody, 29/7/2013 1002,John,Parker, 30/7/2013 1003,Henry,Williams, 31/7/2013
Spring Batch作业配置
我们必须在配置文件中定义spring bean和spring批处理作业。
以下是" job-batch-demo.xml"文件的内容,它是Spring批处理项目中最重要的部分。
<beans xmlns="https://www.springframework.org/schema/beans" xmlns:batch="https://www.springframework.org/schema/batch" xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="https://www.springframework.org/schema/batch https://www.springframework.org/schema/batch/spring-batch-3.0.xsd https://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans-4.3.xsd "> <import resource="../config/context.xml" <import resource="../config/database.xml" <bean id="report" class="com.theitroad.spring.model.Report" scope="prototype" <bean id="itemProcessor" class="com.theitroad.spring.CustomItemProcessor" <batch:job id="DemoJobXMLWriter"> <batch:step id="step1"> <batch:tasklet> <batch:chunk reader="csvFileItemReader" writer="xmlItemWriter" processor="itemProcessor" commit-interval="10"> </batch:chunk> </batch:tasklet> </batch:step> </batch:job> <bean id="csvFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader"> <property name="resource" value="classpath:csv/input/report.csv" <property name="lineMapper"> <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> <property name="lineTokenizer"> <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"> <property name="names" value="id,firstname,lastname,dob" </bean> </property> <property name="fieldSetMapper"> <bean class="com.theitroad.spring.ReportFieldSetMapper" <!-- if no data type conversion, use BeanWrapperFieldSetMapper to map by name <bean class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper"> <property name="prototypeBeanName" value="report" </bean> --> </property> </bean> </property> </bean> <bean id="xmlItemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter"> <property name="resource" value="file:xml/outputs/report.xml" <property name="marshaller" ref="reportMarshaller" <property name="rootTagName" value="report" </bean> <bean id="reportMarshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller"> <property name="classesToBeBound"> <list> <value>com.theitroad.spring.model.Report</value> </list> </property> </bean> </beans>
- 我们正在使用FlatFileItemReader读取CSV文件,使用CustomItemProcessor处理数据并使用StaxEventItemWriter写入XML文件。 
- batch:job–这个标签定义了我们要创建的工作。
 Id属性指定作业的ID。
 我们可以在一个xml文件中定义多个作业。
- batch:step –此标签用于定义弹簧批处理作业的不同步骤。 
- Spring Batch Framework提供了两种不同类型的处理样式,分别是"面向TaskletStep"和"面向块"。 
 本示例中使用的"块导向"样式指的是在事务边界内一次读取数据并创建要写入的"块"。
- reader:用于读取数据的spring bean。 
 在这个例子中,我们使用了- csvFileItemReaderbean,它是- FlatFileItemReader的实例。
- 处理器:这是用于处理数据的类。 
 在此示例中,我们使用了" CustomItemProcessor"。
- writer:用于将数据写入xml文件的bean。 
- commit-interval:此属性定义处理完成后将提交的块的大小。 
 基本上,这意味着ItemReader将一一读取数据,ItemProcessor也将以相同的方式处理数据,但ItemWriter仅在等于commit-interval大小时才写入数据。
- 此项目中使用的三个重要接口是来自org.springframework.batch.item包的ItemReader,ItemProcessor和ItemWriter。 
Spring批模型类
首先,我们将CSV文件读入java对象,然后使用JAXB将其写入xml文件。
下面是带有必需的JAXB批注的模型类。
package com.theitroad.spring.model;
import java.util.Date;
import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
@XmlRootElement(name = "record")
public class Report {
	private int id;
	private String firstName;
	private String lastName;
	private Date dob;
	@XmlAttribute(name = "id")
	public int getId() {
		return id;
	}
	public void setId(int id) {
		this.id = id;
	}
	@XmlElement(name = "firstname")
	public String getFirstName() {
		return firstName;
	}
	public void setFirstName(String firstName) {
		this.firstName = firstName;
	}
	@XmlElement(name = "lastname")
	public String getLastName() {
		return lastName;
	}
	public void setLastName(String lastName) {
		this.lastName = lastName;
	}
	@XmlElement(name = "dob")
	public Date getDob() {
		return dob;
	}
	public void setDob(Date dob) {
		this.dob = dob;
	}
	@Override
	public String toString() {
		return "Report [id=" + id + ", firstname=" + firstName + ", lastName=" + lastName + ", DateOfBirth=" + dob
				+ "]";
	}
}
请注意,模型类字段应与Spring Batch Mapper配置中定义的字段相同,即本例中的"属性名称="名称"值=" id,名字,姓氏,标识"。
Spring Batch FieldSetMapper
需要自定义FieldSetMapper来转换日期。
如果不需要数据类型转换,则仅应使用BeanWrapperFieldSetMapper来按名称自动映射值。
扩展FieldSetMapper的Java类是ReportFieldSetMapper。
package com.theitroad.spring;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;
import com.theitroad.spring.model.Report;
public class ReportFieldSetMapper implements FieldSetMapper<Report> {
	private SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy");
	public Report mapFieldSet(FieldSet fieldSet) throws BindException {
		Report report = new Report();
		report.setId(fieldSet.readInt(0));
		report.setFirstName(fieldSet.readString(1));
		report.setLastName(fieldSet.readString(2));
		//default format yyyy-MM-dd
		//fieldSet.readDate(4);
		String date = fieldSet.readString(3);
		try {
			report.setDob(dateFormat.parse(date));
		} catch (ParseException e) {
			e.printStackTrace();
		}
		return report;
	}
}
Spring批项目处理器
现在,按照作业配置中的定义,将在itemWriter之前触发itemProcessor。
我们已经为它创建了一个CustomItemProcessor.java类。
package com.theitroad.spring;
import org.springframework.batch.item.ItemProcessor;
import com.theitroad.spring.model.Report;
public class CustomItemProcessor implements ItemProcessor<Report, Report> {
	public Report process(Report item) throws Exception {
		
		System.out.println("Processing..." + item);
		String fname = item.getFirstName();
		String lname = item.getLastName();
		
		item.setFirstName(fname.toUpperCase());
		item.setLastName(lname.toUpperCase());
		return item;
	}
}
我们可以在ItemProcessor实现中处理数据,如您所见,我正在将名字和姓氏值转换为大写。
Spring配置文件
在我们的Spring批处理配置文件中,我们导入了两个附加的配置文件–" context.xml"和" database.xml"。
<beans xmlns="https://www.springframework.org/schema/beans" xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" https://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans-4.3.xsd"> <!-- stored job-meta in memory --> <!-- <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> <property name="transactionManager" ref="transactionManager" </bean> --> <!-- stored job-meta in database --> <bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean"> <property name="dataSource" ref="dataSource" <property name="transactionManager" ref="transactionManager" <property name="databaseType" value="mysql" </bean> <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" </bean> </beans>
- jobRepository – JobRepository负责将每个Java对象存储到其正确的元数据表中以进行Spring批处理。 
- transactionManager –负责在commit-interval的大小和处理的数据相等后提交事务。 
- jobLauncher –这是Spring批处理的核心。该接口包含用于触发作业的运行方法。 
<beans xmlns="https://www.springframework.org/schema/beans" xmlns:jdbc="https://www.springframework.org/schema/jdbc" xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="https://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans-4.3.xsd https://www.springframework.org/schema/jdbc https://www.springframework.org/schema/jdbc/spring-jdbc-4.3.xsd"> <!-- connect to database --> <bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource"> <property name="driverClassName" value="com.mysql.jdbc.Driver" <property name="url" value="jdbc:mysql://localhost:3306/Test" <property name="username" value="test" <property name="password" value="test123" </bean> <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" <!-- create job-meta tables automatically --> <!-- <jdbc:initialize-database data-source="dataSource"> <jdbc:script location="org/springframework/batch/core/schema-drop-mysql.sql" <jdbc:script location="org/springframework/batch/core/schema-mysql.sql" </jdbc:initialize-database> --> </beans>
Spring Batch使用一些元数据表来存储批处理作业信息。
我们可以从spring批处理配置中创建它们,但是建议您通过执行SQL文件手动进行操作,如您在上面的注释代码中所见。
从安全角度来看,最好不要向Spring Batch数据库用户授予DDL执行权限。
Spring批表
Spring Batch表非常匹配Java中表示它们的Domain对象。
例如– JobInstance,JobExecution,JobParameters和StepExecution分别映射到BATCH_JOB_INSTANCE,BATCH_JOB_EXECUTION,BATCH_JOB_EXECUTION_PA内存S和BATCH_STEP_EXECUTION。
ExecutionContext映射到BATCH_JOB_EXECUTION_CONTEXT和BATCH_STEP_EXECUTION_CONTEXT。
JobRepository负责将每个java对象保存和存储到正确的表中。
以下是每个元数据表的详细信息。
- Batch_job_instance:BATCH_JOB_INSTANCE表保存与JobInstance相关的所有信息。 
- Batch_job_execution_params:BATCH_JOB_EXECUTION_PA内存S表包含与JobParameters对象有关的所有信息。 
- Batch_job_execution:BATCH_JOB_EXECUTION表保存与JobExecution对象有关的数据。 
 每次运行作业时,都会添加一个新行。
- Batch_step_execution:BATCH_STEP_EXECUTION表包含与StepExecution对象有关的所有信息。 
- Batch_job_execution_context:BATCH_JOB_EXECUTION_CONTEXT表保存与Job的ExecutionContext相关的数据。 
 每个JobExecution都有一个Job ExecutionContext,它包含该特定作业执行所需的所有作业级数据。
 此数据通常表示失败后必须检索的状态,以便JobInstance可以从发生故障的位置重新启动。
- Batch_step_execution_context:BATCH_STEP_EXECUTION_CONTEXT表保存与步骤的ExecutionContext相关的数据。 
 每个StepExecution都只有一个ExecutionContext,并且其中包含需要为特定步骤执行而持久化的所有数据。
 此数据通常表示失败后必须检索的状态,以便JobInstance可以从发生故障的地方重新启动。
- Batch_job_execution_seq:该表保存作业的数据执行顺序。 
- Batch_step_execution_seq:该表保存用于顺序执行的数据。 
- Batch_job_seq:该表保存作业序列的数据,以防万一我们有多个作业,我们将获得多行。 
Spring批测试程序
我们的Spring Batch示例项目已经准备就绪,最后一步是编写一个测试类以将其作为Java程序执行。
package com.theitroad.spring;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class App {
	public static void main(String[] args) {
		String[] springConfig = { "spring/batch/jobs/job-batch-demo.xml" };
		ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(springConfig);
		JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
		Job job = (Job) context.getBean("DemoJobXMLWriter");
		JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis())
				.toJobParameters();
		try {
			JobExecution execution = jobLauncher.run(job, jobParameters);
			System.out.println("Exit Status : " + execution.getStatus());
		} catch (Exception e) {
			e.printStackTrace();
		}
		System.out.println("Done");
		context.close();
	}
}
只需在程序上方运行,您将获得如下所示的输出xml。
<?xml version="1.0" encoding="UTF-8"?><report><record id="1001"><dob>2013-07-29T00:00:00+05:30</dob><firstname>TOM</firstname><lastname>MOODY</lastname></record><record id="1002"><dob>2013-07-30T00:00:00+05:30</dob><firstname>JOHN</firstname><lastname>PARKER</lastname></record><record id="1003"><dob>2013-07-31T00:00:00+05:30</dob><firstname>HENRY</firstname><lastname>WILLIAMS</lastname></record></report>

