Spring批处理示例
欢迎使用Spring Batch示例。
Spring Batch是用于执行批处理作业的Spring框架模块。
我们可以使用spring batch来处理一系列作业。
Spring批处理示例
在进行Spring批处理示例程序之前,让我们对Spring批处理术语有所了解。
一项工作可以包含" n"个步骤。
每个步骤都包含Read-Process-Write任务,或者它可以具有单个操作,称为tasklet。基本上从诸如数据库,CSV等之类的源读取Read-Process-Write,然后处理数据并将其写入诸如数据库,CSV,XML等之类的源。
Tasklet意味着执行单个任务或者操作,例如清理连接,在处理完成后释放资源。
可以将Read-Process-Write和Tasklet链接在一起以运行作业。
Spring批处理示例
让我们考虑一个实现Spring Batch的可行示例。
为了实现目的,我们将考虑以下情形。
包含数据的CSV文件需要与数据一起转换为XML,并且标记将以列名命名。
以下是用于Spring批处理示例的重要工具和库。
Apache Maven 3.5.0 –用于项目构建和依赖项管理。
Eclipse Oxygen版本4.7.0 –用于创建spring batch maven应用程序的IDE。
Java 1.8
Spring Core 4.3.12。RELEASE
Spring OXM 4.3.12。RELEASE
Spring JDBC 4.3.12。RELEASE
Spring Batch 3.0.8。RELEASE
MySQL Java驱动程序5.1.25 –根据您的MySQL安装使用。这是Spring Batch元数据表所必需的。
Spring Batch示例目录结构
下图说明了我们的Spring Batch示例项目中的所有组件。
Spring Batch Maven依赖关系
以下是pom.xml文件的内容,其中包含我们的Spring Batch示例项目所需的所有必需依赖项。
<project xmlns="https://maven.apache.org/POM/4.0.0" xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="https://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.theitroad.spring</groupId> <artifactId>SpringBatchExample</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>SpringBatchDemo</name> <url>https://maven.apache.org</url> <properties> <jdk.version>1.8</jdk.version> <spring.version>4.3.12.RELEASE</spring.version> <spring.batch.version>3.0.8.RELEASE</spring.batch.version> <mysql.driver.version>5.1.25</mysql.driver.version> <junit.version>4.11</junit.version> </properties> <dependencies> <!-- Spring Core --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${spring.version}</version> </dependency> <!-- Spring jdbc, for database --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jdbc</artifactId> <version>${spring.version}</version> </dependency> <!-- Spring XML to/back object --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-oxm</artifactId> <version>${spring.version}</version> </dependency> <!-- MySQL database driver --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.driver.version}</version> </dependency> <!-- Spring Batch dependencies --> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-core</artifactId> <version>${spring.batch.version}</version> </dependency> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-infrastructure</artifactId> <version>${spring.batch.version}</version> </dependency> <!-- Spring Batch unit test --> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-test</artifactId> <version>${spring.batch.version}</version> </dependency> <!-- Junit --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>com.thoughtworks.xstream</groupId> <artifactId>xstream</artifactId> <version>1.4.10</version> </dependency> </dependencies> <build> <finalName>spring-batch</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-eclipse-plugin</artifactId> <version>2.9</version> <configuration> <downloadSources>true</downloadSources> <downloadJavadocs>false</downloadJavadocs> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>${jdk.version}</source> <target>${jdk.version}</target> </configuration> </plugin> </plugins> </build> </project>
Spring Batch处理CSV输入文件
这是用于Spring批处理的示例CSV文件的内容。
1001,Tom,Moody, 29/7/2013 1002,John,Parker, 30/7/2013 1003,Henry,Williams, 31/7/2013
Spring Batch作业配置
我们必须在配置文件中定义spring bean和spring批处理作业。
以下是" job-batch-demo.xml"文件的内容,它是Spring批处理项目中最重要的部分。
<beans xmlns="https://www.springframework.org/schema/beans" xmlns:batch="https://www.springframework.org/schema/batch" xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="https://www.springframework.org/schema/batch https://www.springframework.org/schema/batch/spring-batch-3.0.xsd https://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans-4.3.xsd "> <import resource="../config/context.xml" <import resource="../config/database.xml" <bean id="report" class="com.theitroad.spring.model.Report" scope="prototype" <bean id="itemProcessor" class="com.theitroad.spring.CustomItemProcessor" <batch:job id="DemoJobXMLWriter"> <batch:step id="step1"> <batch:tasklet> <batch:chunk reader="csvFileItemReader" writer="xmlItemWriter" processor="itemProcessor" commit-interval="10"> </batch:chunk> </batch:tasklet> </batch:step> </batch:job> <bean id="csvFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader"> <property name="resource" value="classpath:csv/input/report.csv" <property name="lineMapper"> <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> <property name="lineTokenizer"> <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"> <property name="names" value="id,firstname,lastname,dob" </bean> </property> <property name="fieldSetMapper"> <bean class="com.theitroad.spring.ReportFieldSetMapper" <!-- if no data type conversion, use BeanWrapperFieldSetMapper to map by name <bean class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper"> <property name="prototypeBeanName" value="report" </bean> --> </property> </bean> </property> </bean> <bean id="xmlItemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter"> <property name="resource" value="file:xml/outputs/report.xml" <property name="marshaller" ref="reportMarshaller" <property name="rootTagName" value="report" </bean> <bean id="reportMarshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller"> <property name="classesToBeBound"> <list> <value>com.theitroad.spring.model.Report</value> </list> </property> </bean> </beans>
我们正在使用FlatFileItemReader读取CSV文件,使用CustomItemProcessor处理数据并使用StaxEventItemWriter写入XML文件。
batch:job
–这个标签定义了我们要创建的工作。
Id属性指定作业的ID。
我们可以在一个xml文件中定义多个作业。batch:step –此标签用于定义弹簧批处理作业的不同步骤。
Spring Batch Framework提供了两种不同类型的处理样式,分别是"面向TaskletStep"和"面向块"。
本示例中使用的"块导向"样式指的是在事务边界内一次读取数据并创建要写入的"块"。reader:用于读取数据的spring bean。
在这个例子中,我们使用了csvFileItemReader
bean,它是FlatFileItemReader
的实例。处理器:这是用于处理数据的类。
在此示例中,我们使用了" CustomItemProcessor"。writer:用于将数据写入xml文件的bean。
commit-interval:此属性定义处理完成后将提交的块的大小。
基本上,这意味着ItemReader将一一读取数据,ItemProcessor也将以相同的方式处理数据,但ItemWriter仅在等于commit-interval大小时才写入数据。此项目中使用的三个重要接口是来自org.springframework.batch.item包的ItemReader,ItemProcessor和ItemWriter。
Spring批模型类
首先,我们将CSV文件读入java对象,然后使用JAXB将其写入xml文件。
下面是带有必需的JAXB批注的模型类。
package com.theitroad.spring.model; import java.util.Date; import javax.xml.bind.annotation.XmlAttribute; import javax.xml.bind.annotation.XmlElement; import javax.xml.bind.annotation.XmlRootElement; @XmlRootElement(name = "record") public class Report { private int id; private String firstName; private String lastName; private Date dob; @XmlAttribute(name = "id") public int getId() { return id; } public void setId(int id) { this.id = id; } @XmlElement(name = "firstname") public String getFirstName() { return firstName; } public void setFirstName(String firstName) { this.firstName = firstName; } @XmlElement(name = "lastname") public String getLastName() { return lastName; } public void setLastName(String lastName) { this.lastName = lastName; } @XmlElement(name = "dob") public Date getDob() { return dob; } public void setDob(Date dob) { this.dob = dob; } @Override public String toString() { return "Report [id=" + id + ", firstname=" + firstName + ", lastName=" + lastName + ", DateOfBirth=" + dob + "]"; } }
请注意,模型类字段应与Spring Batch Mapper配置中定义的字段相同,即本例中的"属性名称="名称"值=" id,名字,姓氏,标识"。
Spring Batch FieldSetMapper
需要自定义FieldSetMapper
来转换日期。
如果不需要数据类型转换,则仅应使用BeanWrapperFieldSetMapper来按名称自动映射值。
扩展FieldSetMapper的Java类是ReportFieldSetMapper。
package com.theitroad.spring; import java.text.ParseException; import java.text.SimpleDateFormat; import org.springframework.batch.item.file.mapping.FieldSetMapper; import org.springframework.batch.item.file.transform.FieldSet; import org.springframework.validation.BindException; import com.theitroad.spring.model.Report; public class ReportFieldSetMapper implements FieldSetMapper<Report> { private SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy"); public Report mapFieldSet(FieldSet fieldSet) throws BindException { Report report = new Report(); report.setId(fieldSet.readInt(0)); report.setFirstName(fieldSet.readString(1)); report.setLastName(fieldSet.readString(2)); //default format yyyy-MM-dd //fieldSet.readDate(4); String date = fieldSet.readString(3); try { report.setDob(dateFormat.parse(date)); } catch (ParseException e) { e.printStackTrace(); } return report; } }
Spring批项目处理器
现在,按照作业配置中的定义,将在itemWriter之前触发itemProcessor。
我们已经为它创建了一个CustomItemProcessor.java类。
package com.theitroad.spring; import org.springframework.batch.item.ItemProcessor; import com.theitroad.spring.model.Report; public class CustomItemProcessor implements ItemProcessor<Report, Report> { public Report process(Report item) throws Exception { System.out.println("Processing..." + item); String fname = item.getFirstName(); String lname = item.getLastName(); item.setFirstName(fname.toUpperCase()); item.setLastName(lname.toUpperCase()); return item; } }
我们可以在ItemProcessor实现中处理数据,如您所见,我正在将名字和姓氏值转换为大写。
Spring配置文件
在我们的Spring批处理配置文件中,我们导入了两个附加的配置文件–" context.xml"和" database.xml"。
<beans xmlns="https://www.springframework.org/schema/beans" xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" https://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans-4.3.xsd"> <!-- stored job-meta in memory --> <!-- <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> <property name="transactionManager" ref="transactionManager" </bean> --> <!-- stored job-meta in database --> <bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean"> <property name="dataSource" ref="dataSource" <property name="transactionManager" ref="transactionManager" <property name="databaseType" value="mysql" </bean> <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" </bean> </beans>
jobRepository – JobRepository负责将每个Java对象存储到其正确的元数据表中以进行Spring批处理。
transactionManager –负责在commit-interval的大小和处理的数据相等后提交事务。
jobLauncher –这是Spring批处理的核心。该接口包含用于触发作业的运行方法。
<beans xmlns="https://www.springframework.org/schema/beans" xmlns:jdbc="https://www.springframework.org/schema/jdbc" xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="https://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans-4.3.xsd https://www.springframework.org/schema/jdbc https://www.springframework.org/schema/jdbc/spring-jdbc-4.3.xsd"> <!-- connect to database --> <bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource"> <property name="driverClassName" value="com.mysql.jdbc.Driver" <property name="url" value="jdbc:mysql://localhost:3306/Test" <property name="username" value="test" <property name="password" value="test123" </bean> <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" <!-- create job-meta tables automatically --> <!-- <jdbc:initialize-database data-source="dataSource"> <jdbc:script location="org/springframework/batch/core/schema-drop-mysql.sql" <jdbc:script location="org/springframework/batch/core/schema-mysql.sql" </jdbc:initialize-database> --> </beans>
Spring Batch使用一些元数据表来存储批处理作业信息。
我们可以从spring批处理配置中创建它们,但是建议您通过执行SQL文件手动进行操作,如您在上面的注释代码中所见。
从安全角度来看,最好不要向Spring Batch数据库用户授予DDL执行权限。
Spring批表
Spring Batch表非常匹配Java中表示它们的Domain对象。
例如– JobInstance,JobExecution,JobParameters和StepExecution分别映射到BATCH_JOB_INSTANCE,BATCH_JOB_EXECUTION,BATCH_JOB_EXECUTION_PA内存S和BATCH_STEP_EXECUTION。
ExecutionContext映射到BATCH_JOB_EXECUTION_CONTEXT和BATCH_STEP_EXECUTION_CONTEXT。
JobRepository负责将每个java对象保存和存储到正确的表中。
以下是每个元数据表的详细信息。
Batch_job_instance:BATCH_JOB_INSTANCE表保存与JobInstance相关的所有信息。
Batch_job_execution_params:BATCH_JOB_EXECUTION_PA内存S表包含与JobParameters对象有关的所有信息。
Batch_job_execution:BATCH_JOB_EXECUTION表保存与JobExecution对象有关的数据。
每次运行作业时,都会添加一个新行。Batch_step_execution:BATCH_STEP_EXECUTION表包含与StepExecution对象有关的所有信息。
Batch_job_execution_context:BATCH_JOB_EXECUTION_CONTEXT表保存与Job的ExecutionContext相关的数据。
每个JobExecution都有一个Job ExecutionContext,它包含该特定作业执行所需的所有作业级数据。
此数据通常表示失败后必须检索的状态,以便JobInstance可以从发生故障的位置重新启动。Batch_step_execution_context:BATCH_STEP_EXECUTION_CONTEXT表保存与步骤的ExecutionContext相关的数据。
每个StepExecution都只有一个ExecutionContext,并且其中包含需要为特定步骤执行而持久化的所有数据。
此数据通常表示失败后必须检索的状态,以便JobInstance可以从发生故障的地方重新启动。Batch_job_execution_seq:该表保存作业的数据执行顺序。
Batch_step_execution_seq:该表保存用于顺序执行的数据。
Batch_job_seq:该表保存作业序列的数据,以防万一我们有多个作业,我们将获得多行。
Spring批测试程序
我们的Spring Batch示例项目已经准备就绪,最后一步是编写一个测试类以将其作为Java程序执行。
package com.theitroad.spring; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.context.support.ClassPathXmlApplicationContext; public class App { public static void main(String[] args) { String[] springConfig = { "spring/batch/jobs/job-batch-demo.xml" }; ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(springConfig); JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher"); Job job = (Job) context.getBean("DemoJobXMLWriter"); JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis()) .toJobParameters(); try { JobExecution execution = jobLauncher.run(job, jobParameters); System.out.println("Exit Status : " + execution.getStatus()); } catch (Exception e) { e.printStackTrace(); } System.out.println("Done"); context.close(); } }
只需在程序上方运行,您将获得如下所示的输出xml。
<?xml version="1.0" encoding="UTF-8"?><report><record id="1001"><dob>2013-07-29T00:00:00+05:30</dob><firstname>TOM</firstname><lastname>MOODY</lastname></record><record id="1002"><dob>2013-07-30T00:00:00+05:30</dob><firstname>JOHN</firstname><lastname>PARKER</lastname></record><record id="1003"><dob>2013-07-31T00:00:00+05:30</dob><firstname>HENRY</firstname><lastname>WILLIAMS</lastname></record></report>