Spring Batch Remote Chunk模式下,远程执行JOB时,传输的对象是ChunkRequest/ChunkResponse,无法转成JSON格式传输。
注意此处使用的是SPRING JACKSON,而不是JACKSON。一般是在SPRING INTEGRATIONA框架下转的。
需要自定义Transformer:
JsonToChunkRequestTransformer.java
package com.frandorado.springbatchawsintegrationslave.transformer;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.stream.IntStream;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.integration.chunk.ChunkRequest;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.aws.support.AwsHeaders;
import org.springframework.integration.json.JsonToObjectTransformer;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.fasterxml.jackson.databind.ObjectMapper;
@Component
public class JsonToChunkRequestTransformer extends JsonToObjectTransformer {
private static final String MESSAGE_GROUP_ID_HEADER = "message-group-id";
@Autowired
AmazonSQSAsync amazonSQSAsync;
@Override
protected Object doTransform(Message<?> message) throws Exception {
// ACK
ack(message);
return this.getMessageBuilderFactory().withPayload(buildChunkRequest(message)).setHeader(MESSAGE_GROUP_ID_HEADER, "unique").build();
}
private ChunkRequest buildChunkRequest(Message<?> message) throws IOException {
Map map = new ObjectMapper().readValue(message.getPayload().toString(), Map.class);
Map stepContributionMap = (Map) map.get("stepContribution");
Map exitStatusMap = (Map) stepContributionMap.get("exitStatus");
StepContribution stepContribution = new StepContribution(new StepExecution("null", null));
ExitStatus exitStatus = new ExitStatus((String) exitStatusMap.get("exitCode"), (String) exitStatusMap.get("exitDescription"));
IntStream.range(0, (Integer) stepContributionMap.get("readCount")).forEach(e -> stepContribution.incrementReadCount());
stepContribution.incrementWriteCount((Integer) stepContributionMap.get("writeCount"));
stepContribution.incrementFilterCount((Integer) stepContributionMap.get("filterCount"));
stepContribution.incrementReadSkipCount((Integer) stepContributionMap.get("readSkipCount"));
IntStream.range(0, (Integer) stepContributionMap.get("writeSkipCount")).forEach(e -> stepContribution.incrementWriteSkipCount());
IntStream.range(0, (Integer) stepContributionMap.get("processSkipCount"))
.forEach(e -> stepContribution.incrementProcessSkipCount());
stepContribution.setExitStatus(exitStatus);
return new ChunkRequest((Integer) map.get("sequence"), (Collection) map.get("items"), (Integer) map.get("jobId"), stepContribution);
}
private void ack(Message<?> message) {
String receiptHandle = message.getHeaders().get(AwsHeaders.RECEIPT_HANDLE, String.class);
String queue = message.getHeaders().get(AwsHeaders.QUEUE, String.class);
String queueUrl = amazonSQSAsync.getQueueUrl(queue).getQueueUrl();
amazonSQSAsync.deleteMessage(queueUrl, receiptHandle);
}
}
JsonToChunkResponseTransformer.java
package com.frandorado.springbatchawsintegrationmaster.transformer;
import java.io.IOException;
import java.util.Map;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.integration.chunk.ChunkResponse;
import org.springframework.integration.json.JsonToObjectTransformer;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.ObjectMapper;
@Component
public class JsonToChunkResponseTransformer extends JsonToObjectTransformer {
@Override
protected Object doTransform(Message<?> message) throws Exception {
return buildChunkResponse(message);
}
private ChunkResponse buildChunkResponse(Message<?> message) throws IOException {
Map map = new ObjectMapper().readValue(message.getPayload().toString(), Map.class);
Integer jobId = (Integer) map.get("jobId");
Integer sequence = (Integer) map.get("sequence");
String messageContent = (String) map.get("message");
Boolean status = (Boolean) map.get("successful");
StepContribution stepContribution = new StepContribution(new StepExecution("-", null));
return new ChunkResponse(status, sequence, Long.valueOf(jobId), stepContribution, messageContent);
}
}
还有一种方式,就是如果第三类不支持转JSON,即代码里没有JACKSON的注解,可以采用MIXIN的方式:
StepExecutionJacksonMixIn.java
package org.springframework.cloud.dataflow.rest.client.support;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.springframework.batch.core.StepExecution;
/**
* Jackson MixIn for {@link StepExecution} de-serialization.
*
* @author Gunnar Hillert
* @since 1.0
*/
@JsonIgnoreProperties({ "jobExecution", "jobParameters", "jobExecutionId", "skipCount", "summary" })
public abstract class StepExecutionJacksonMixIn {
@JsonCreator
StepExecutionJacksonMixIn(@JsonProperty("stepName") String stepName) {
}
}
在配置文件中注册才能使用:
JacksonConfiguration.java
import java.util.Locale;
import java.util.TimeZone;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.boot.autoconfigure.jackson.Jackson2ObjectMapperBuilderCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.support.json.Jackson2JsonObjectMapper;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.novacredit.bmb.batchmonitor.springbatch.common.batch.jackson.ISO8601DateFormatWithMilliSeconds;
import com.novacredit.bmb.batchmonitor.springbatch.common.batch.jackson.mixin.ExecutionContextJacksonMixIn;
import com.novacredit.bmb.batchmonitor.springbatch.common.batch.jackson.mixin.ExitStatusJacksonMixIn;
import com.novacredit.bmb.batchmonitor.springbatch.common.batch.jackson.mixin.JobExecutionJacksonMixIn;
import com.novacredit.bmb.batchmonitor.springbatch.common.batch.jackson.mixin.JobInstanceJacksonMixIn;
import com.novacredit.bmb.batchmonitor.springbatch.common.batch.jackson.mixin.JobParameterJacksonMixIn;
import com.novacredit.bmb.batchmonitor.springbatch.common.batch.jackson.mixin.JobParametersJacksonMixIn;
import com.novacredit.bmb.batchmonitor.springbatch.common.batch.jackson.mixin.StepExecutionJacksonMixIn;
@Configuration
public class JacksonConfiguration {
@Bean
public Jackson2JsonObjectMapper jackson2JsonObjectMapper(ObjectMapper objectMapper) {
return new Jackson2JsonObjectMapper(objectMapper);
}
@Bean
public Jackson2ObjectMapperBuilderCustomizer dataflowObjectMapperBuilderCustomizer() {
return (builder) -> {
builder.dateFormat(new ISO8601DateFormatWithMilliSeconds(TimeZone.getDefault(), Locale.getDefault(), true));
// apply SCDF Batch Mixins to
// ignore the JobExecution in StepExecution to prevent infinite loop.
// https://github.com/spring-projects/spring-hateoas/issues/333
builder.mixIn(StepExecution.class, StepExecutionJacksonMixIn.class);
builder.mixIn(ExecutionContext.class, ExecutionContextJacksonMixIn.class);
builder.mixIn(JobExecution.class, JobExecutionJacksonMixIn.class);
builder.mixIn(JobParameters.class, JobParametersJacksonMixIn.class);
builder.mixIn(JobParameter.class, JobParameterJacksonMixIn.class);
builder.mixIn(JobInstance.class, JobInstanceJacksonMixIn.class);
// builder.mixIn(StepExecutionHistory.class, StepExecutionHistoryJacksonMixIn.class);
builder.mixIn(ExecutionContext.class, ExecutionContextJacksonMixIn.class);
builder.mixIn(ExitStatus.class, ExitStatusJacksonMixIn.class);
// objectMapper.setDateFormat(new ISO8601DateFormatWithMilliSeconds());
builder.modules(new JavaTimeModule(), new Jdk8Module());
};
}
}
@Bean
public IntegrationFlow flow4Contribution(
ConnectionFactory connectionFactory,
JobProperties jobProperties,
Jackson2JsonObjectMapper jackson2JsonObjectMapper
) {
return IntegrationFlows
.from(request4ContributionMaster())
.enrichHeaders(headerEnricherConfigurer())
.transform(Transformers.toJson(jackson2JsonObjectMapper))
.handle(jmsOutboundGateway4Contribution(connectionFactory, jobProperties))
.transform(Transformers.fromJson(StepExecution.class, jackson2JsonObjectMapper))
.channel(replies4ContributionMaster(null))
.get();
}
https://github.com/spring-cloud/spring-cloud-dataflow/tree/master/spring-cloud-dataflow-rest-client/src/main/java/org/springframework/cloud/dataflow/rest/client/support
https://frandorado.github.io/spring/2019/07/29/spring-batch-aws-series-introduction.html
https://github.com/frandorado/spring-projects/tree/master/spring-batch-aws-integration/spring-batch-aws-integration-master/src/main/java/com/frandorado/springbatchawsintegrationmaster/transformer
https://github.com/frandorado/spring-projects/tree/master/spring-batch-aws-integration/spring-batch-aws-integration-slave/src/main/java/com/frandorado/springbatchawsintegrationslave/transformer