paulwong

#

各种获取JVM DUMP的方法

JVM 的线程堆栈 dump 也称 core dump,内容为文本,主要包含当时 JVM 的线程堆栈,堆 dump 也称 heap dump,内容为二进制格式,主要包含当时 JVM 堆内存中的内容。由于各个操作系统、各个 JVM 实现不同,即使同一 JVM 实现,各个版本也有差异,本文描述的方法都基于 64 位 Linux 操作系统环境,Java 8 Oracle HotSpot JVM 实现。

堆栈和堆的内容在定位问题的时候,都是非常重要的信息。线程堆栈 dump 可以了解当时 JVM 中所有线程的运行情况,比如线程的状态和当前正在运行的代码行。堆 dump 可以了解当时堆的使用情况,各个类实例的数量及各个实例所占用的空间大小。

线程堆栈

使用 jstack

jstack 是 JDK 自带的工具,用于 dump 指定进程 ID(PID)的 JVM 的线程堆栈信息。

# 打印堆栈信息到标准输出 jstack PID  
# 打印堆栈信息到标准输出,会打印关于锁的信息 jstack -l PID  
强制打印堆栈信息到标准输出,如果使用 jstack PID 没有响应的情况下(此时 JVM 进程可能挂起),
加 -F 参数 jstack -F PID 

使用 jcmd

jcmd 是 JDK 自带的工具,用于向 JVM 进程发送命令,根据命令的不同,可以代替或部分代替 jstack、jmap 等。可以发送命令 Thread.print 来打印出 JVM 的线程堆栈信息。

# 下面的命令同等于 jstack PID 
jcmd PID Thread.print  

# 同等于 jstack -l PID 
jcmd PID Thread.print -l 

使用 kill -3

kill 可以向特定的进程发送信号(SIGNAL),缺省情况是发送终止(TERM) 的信号 ,即 kill PID 与 kill -15 PID 或 kill -TERM PID 是等价的。JVM 进程会监听 QUIT 信号(其值为 3),当收到这个信号时,会打印出当时的线程堆栈和堆内存使用概要,相比 jstack,此时多了堆内存的使用概要情况。但 jstack 可以指定 -l 参数,打印锁的信息。

kill -3 PID 
# 或 kill -QUIT PID 

-XX:+HeapDumpOnOutOfMemoryError

添加 JVM 参数 -XX:+HeapDumpOnOutOfMemoryError 后,当发生 OOM(OutOfMemory)时,自动堆 dump。缺省情况下,JVM 会创建一个名称为 java_pidPID.hprof 的堆 dump 文件在 JVM 的工作目录下。但可以使用参数 -XX:HeapDumpPath=PATH 来指定 dump 文件的保存位置。

# JVM 发生 OOM 时,会自动在 /var/log/abc 目录下产生堆 dump 文件 java_pidPID.hprof 
java -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/var/log/abc/ 

jmap

jmap 也是 JDK 自带的工具,主要用于获取堆相关的信息。

堆 dump

# 将 JVM 的堆 dump 到指定文件,如果堆中对象较多,需要的时间会较长,子参数 format 只支持 b,
即二进制格式
jmap -dump:format=b,file=FILE_WITH_PATH

# 如果 JVM 进程未响应命令,可以加上参数 -F 尝试
jmap -F -dump:format=b,file=FILE_WITH_PATH

# 可以只 dump 堆中的存活对象,加上 live 子参数,但使用 -F 时不支持 live
jmap -dump:live,format=b,file=FILE_WITH_PATH

获取堆概要信息

# -heap 参数用于查看指定 JVM 进程的堆的信息,包括堆的各个参数的值,堆中新生代、年老代的内存大小、使用率等 
jmap -heap PID  

# 同样,如果 JVM 进程未响应命令,可以加上参数 -F 尝试 
jmap -F -heap PID 

一个实例输出如下:

Attaching to process ID 68322, please wait
Debugger attached successfully.
Server compiler detected.
JVM version is 25.112-b16

using thread-local object allocation.
Parallel GC with 4 thread(s)

Heap Configuration:
   MinHeapFreeRatio         = 0
   MaxHeapFreeRatio         = 100
   MaxHeapSize              = 268435456 (256.0MB)
   NewSize                  = 8388608 (8.0MB)
   MaxNewSize               = 89128960 (85.0MB)
   OldSize                  = 16777216 (16.0MB)
   NewRatio                 = 2
   SurvivorRatio            = 8
   MetaspaceSize            = 21807104 (20.796875MB)
   CompressedClassSpaceSize = 1073741824 (1024.0MB)
   MaxMetaspaceSize         = 17592186044415 MB
   G1HeapRegionSize         = 0 (0.0MB)

Heap Usage:
PS Young Generation
Eden Space:
   capacity = 41943040 (40.0MB)
   used     = 1701504 (1.6226806640625MB)
   free     = 40241536 (38.3773193359375MB)
   4.05670166015625% used
From Space:
   capacity = 4194304 (4.0MB)
   used     = 0 (0.0MB)
   free     = 4194304 (4.0MB)
   0.0% used
To Space:
   capacity = 5242880 (5.0MB)
   used     = 0 (0.0MB)
   free     = 5242880 (5.0MB)
   0.0% used
PS Old Generation
   capacity = 30408704 (29.0MB)
   used     = 12129856 (11.56793212890625MB)
   free     = 18278848 (17.43206787109375MB)
   39.889421134159484% used

16658 interned Strings occupying 1428472 bytes.

获取堆中的类实例统计
# 打印 JVM 堆中的类实例统计信息,以占用内存的大小排序,同样,如果 JVM 未响应命令,也可以使用 -F 参数 
jmap -histo PID  

# 也可以只统计堆中的存活对象,加上 live 子参数,但使用 -F 时不支持 live 
jmap -histo:live PID 

使用 jcmd

# 等同 jmap -dump:live,format=b,file=FILE_WITH_PATH
jcmd PID GC.heap_dump FILE_WITH_PATH

# 等同 jmap -dump:format=b,file=FILE_WITH_PATH
jcmd PID GC.heap_dump -all FILE_WITH_PATH

# 等同 jmap -histo:live PID
jcmd PID GC.class_histogram

# 等同 jmap -histo PID
jcmd PID GC.class_histogram -all

posted @ 2020-02-24 22:03 paulwong 阅读(1225) | 评论 (0)编辑 收藏

Mongodb shell中select in 的实现


var bookIds = db.likes.find({userId:100}).map(function(like) { 
  return like.bookId; 
});
var books = db.books.find({_id:{$in:bookIds}});

db.REPORT_ITEM.count({REQUEST_ID : db.BATCH_CONTROL.find({ FILE_NAME : "20200218_100000.file" }).map(function(like) { 
  return like._id; 
})[0].str, JOB_TYPE_ENUM:"CHECK"})

posted @ 2020-02-21 23:10 paulwong 阅读(525) | 评论 (0)编辑 收藏

采用 jstatd 监控服务器


https://www.jianshu.com/p/97f09e1648a6

posted @ 2020-02-21 17:08 paulwong 阅读(332) | 评论 (0)编辑 收藏

mongodb错误记录

https://blog.csdn.net/wangxiaotongfan/article/details/81560463


posted @ 2020-02-21 08:50 paulwong 阅读(277) | 评论 (0)编辑 收藏

JAVA进程无故退出而没有留下LOG?

LINUX通常有个OOM KILLER进程,对于他认为吃内存的进程,会根据一定的算分,执行kill -9杀掉,查看日志如下:

#less /tmp/messages
Feb 20 03:55:09 ip kernel: Out of memory: Kill process 5974 (java) score 494 or sacrifice child
Feb 20 03:55:09 ip kernel: Killed process 5974 (java), UID 1001, total-vm:23674020kB, anon-rss:17503912kB, file-rss:0kB, shmem-rss:0kB

那如何屏蔽呢?

#/etc/cron.d/oom_disable
*/1**** root pgrep -f "java" | while read PID;do echo -17 > /proc/$PID/oom_adj;done

参考文档:
Linux内核OOM机制的详细分析
http://blog.chinaunix.net/uid-29242873-id-3942763.html


posted @ 2020-02-20 15:57 paulwong 阅读(892) | 评论 (0)编辑 收藏

!!21 MOST IMPORTANT JAVA 8 VM OPTIONS FOR SERVERS


  7 KOMMENTARE

In this video I explain some 21 JVM parameters which are suited for most server applications. If you have any questions, you can read those links below for more information or just ask in the comments section.

Java server flags video

I run several Java enterprise server applications. I often wondered – what are the best „default“ JVM settings for a server application to start with in production? I read a lot on the web and tried several things myself and wanted to share what I found out, so far. Links containing more information about JVM optimization can be found here:

http://blog.sokolenko.me/2014/11/javavm-options-production.html

http://www.petefreitag.com/articles/gctuning/

http://stas-blogspot.blogspot.de/2011/07/most-complete-list-of-xx-options-for.html

 

So let’s start:

-server

Use „-server“: All 64-bit JVMs use the server VM as default anyway. This setting generally optimizes the JVM for long running server applications instead of startup time. The JVM will collect more data about the Java byte code during program execution and generate the most efficient machine code via JIT.

-Xms=<heap size>[g|m|k] -Xmx=<heap size>[g|m|k]

The „-Xmx/-Xms“ settings specify the maximum and minimum values for the JVM heap memory. For servers, both params should have the same value to avoid heap resizing during runtime. I’ve applications running with 16GB heap sizes without an issue.

Depending on your application, you will have to try out how much memory will be best suited for your use case.

-XX:MaxMetaspaceSize=<metaspace size>[g|m|k]

Java 8 has no „Permanent Generation“ (PermGen) anymore but requires additional „Metaspace“ memory instead. This memory is used, in addition to the heap memory we specified before, for storing class meta data information.

The default size will be unlimited – I tend to limit MaxMetaspaceSize with a somewhat high value. Just in case something goes wrong with the application, the JVM will not hog all the memory of the server.

I suggest: Let your application run for a couple of days to get a feeling for how much Metaspace Size it uses normally. Upon next restart of the application set the limit to e.g. double the value.

-XX:+CMSClassUnloadingEnabled

Additionally, you might want to allow the JVM to unload classes which are held in memory but no code is pointing to them any more. If your application generates lots of dynamic classes, this is what you want.

-XX:+UseConcMarkSweepGC

This option makes the JVM use the ConcurrentMarkSweepGC – It can do much work in parallel to program execution but in some circumstances a „full GC“ with a „STW pause“ might still occur. I’ve read many articles and came to the conclusion that this GC is still the best one for server workloads.

-XX:+CMSParallelRemarkEnabled

The option CMSParallelRemarkEnabled means the remarking is done in parallel to program execution – which is what you want if your server has many cores (and most servers do).

 -XX:+UseCMSInitiatingOccupancyOnly  -XX:CMSInitiatingOccupancyFraction=<percent>

Normally the GC will use heuristics to know when it’s time to clear memory. GC might kick in too late with default settings (causing full-Gcs).
Some sources say it might be a good idea to disable heuristics altogether and just use generation occupancy to start a CMS collection cycle. Setting values around 70% worked fine for all of my applications and use cases.

-XX:+ScavengeBeforeFullGC

The first option tells the GC to first free memory by clearing out the „young generation“ or fairly new objects before doing a full GC.

-XX:+CMSScavengeBeforeRemark

CMSScavengeBeforeRemark does attempt a minor collection before the CMS remark phase – thus keeping the remark pause afterwards short.

-XX:+CMSClassUnloadingEnabled

The option „-XX:+CMSClassUnloadingEnabled“ here tells the JVM to unload classes, which are not needed any more by the running application. If you deploy war files to an application server like wildfly, tomcat or glassfish without restarting the server after the deployment, this flag is for you.

-XX:+ExplicitGCInvokesConcurrentAndUnloadsClasses

The option „-XX:+ExplicitGCInvokesConcurrentAndUnloadsClasses“ is especially important if your application uses RMI (remote method invocation). The usage of RMI will cause the JVM to do a FULL-GC EVERY HOUR! This might be a very bad idea for large heap sizes because the FULL-GC pause might take up to several seconds. It would be better to do a concurrent GC and try to unload unused classes to free up more memory – which is exactly what the second option does.

-XX:+PrintGCDateStamps -verbose:gc -XX:+PrintGCDetails -Xloggc:"<path to log>"

These options shown here will write out all GC related information to a specified log file. You can see how well your GC configuration works by looking into it.

I personally prefer to use the „Visual GC“ plug in for the „Visual VM“ tool to monitor the general JVM and GC behavior.

-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=<path to dump>`date`.hprof

When your JVM runs out of memory, you will want to know why. Since the OOM error might be hard to reproduce and you want to get your production server up and running again – you should specify a path for a heap dump. When things have settled down, you can analyze the dump afterwards.

-Djava.rmi.server.hostname=<external IP> -Dcom.sun.management.jmxremote.port=<port>

These options will help you to specify an IP and port for JMX – you will need those ports open to connect remotely to a JVM running on a server for tools like VisualVM. You can gain deep insights over cpu and memory usage, gc behaviour, class loading, thread count and usage of your application this way.

Visual VM
Lastly, I would like to recommend to you the VisualVM tool which is bundled with the Java 8 JDK. You can use it to gain more insights about your specific application behaviour on the JVM – like cpu and memory usage, thread utilisation and much more.

Visual GCVisualVM can be extended with a plug in called „Visual GC“. It will briefly show you VERY detailed information about the usage of the young and old generation object spaces. You can easily spot problems with garbage collection simply by analyzing these graphs during application runtime.

Thank you very much for watching! If you liked the video you might consider giving it a „thumbs up“. If you have any questions – just put them in the comments section. I will reply as quickly as possible.

-------------------------------------------------------

-XX:+UseCompressedOops [If Max Heap allocation is less than 32GB]
This can save a significant amount of memory and this option should already be enabled by default on recent java 8 versions. This option allowes object references to be stored as 32-bit values instead of 64-bit on 64-bit JVMs. This leads to before mentioned memory savings.

-XX:+AggressiveOpts
This option will enable performance options which are hoped to become enabled by default in upcoming released of the JVM. This option sets some performance settings but is marked as experimental! So you should only enable it, when you have to possibility to test your application thoroughly before enabling this flag on an production server.

-XX:+UseStringDeduplication
Since Java 8 update 20 you can use this option to reduce the memory usage of your application. The JVM will spot identical strings in memory, remove the duplicated and point all references to the remaining, single instance of the string.

-XX:+UseG1GC
Will tell the JVM to use the most recent G1 garbage collector. You are trading better application response times (due to shorter gc times with G1) against lower throughput (compared against good old ConcMarkSweepGC / CMS). If your application can deliver more value through short gc times, then G1 is definately better suited. Otherwise on Java 8, I’d recommend sticking with CMS.

Concerning your Tomcat 8 question, I’d suggest you have a look into it with the „VisualVM“ tool. Look at memory usage, GC times (visual GC plugin), pull and analyse stack traces or thread dumps to find the weak spot. You might also consider attaching a debugger to tomcat to find the bug.



https://www.maknesium.de/21-most-important-java-8-vm-options-for-servers

posted @ 2020-02-16 22:30 paulwong 阅读(342) | 评论 (0)编辑 收藏

windows下jenkins提示文件名太长

由于jenkins是调用windows的git取代码,因此是git的问题,进行如下配置即可:

git config --global core.longpaths true

posted @ 2020-02-14 14:37 paulwong 阅读(523) | 评论 (0)编辑 收藏

SPRING BOOT 环境下减少中间件依赖的UNIT测试

SPRING BOOT 环境下,测试有时会依赖于外部的中间件,如Mysql,Activemq,Mongodb等,那如何能减少这种依赖呢?
SPRING BOOT其实已经实现了自动化配置。

Mongodb

SPRING BOOT的自动化配置文件:org.springframework.boot.autoconfigure.mongo.embeddedEmbedded.MongoAutoConfiguration.java

在pom.xml中新增一test profile,并添加相应jar包,这样可防止对其他profile的影响,如果是在Eclipse跑测试,需在Project的属性中指定Active Profile为test,以覆盖pom.xml的定义。
这种方式即使是使用SPRING DATA MONGODB的REPOSITORY也是适用的。

    <profile>
        <id>test</id>
        <dependencies>
            <dependency>
                <groupId>de.flapdoodle.embed</groupId>
                <artifactId>de.flapdoodle.embed.mongo</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
        <activation>
            <activeByDefault>false</activeByDefault>
        </activation>
    </profile>
在application-test.yaml中添加端口,其他如IP那些信息都不需要
spring:
   data:
      mongodb:
         port: 27017

unit test config

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.List;

import javax.annotation.PostConstruct;
import javax.sql.DataSource;

import org.bson.Document;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.repository.config.EnableMongoRepositories;
import org.springframework.jdbc.datasource.init.DatabasePopulatorUtils;
import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator;
import org.springframework.util.FileCopyUtils;

@Configuration
@Profile({"test", "integrationTest"})
@EnableMongoRepositories(
        basePackages = {"paul.com.repository"
        }
)
public class EmbeddedDataSourceConfiguration {
    
    @Value("classpath:/initdata/USER.json")
    private Resource userResource;

    @Value("classpath:/initdata/MEMBERS.json")
    private Resource membersResource;
    
    @Autowired
    private ResourceLoader resourceLoader;
    
    @Autowired
    private DataSource dataSource;
    
    @Autowired
    private MongoTemplate  mongoTemplate;
    
    @PostConstruct
    protected void initialize() throws FileNotFoundException, IOException {
        this.initializeHsqldb();
        this.initializeMongodb();
    }
    
    private void initializeHsqldb() {
        ResourceDatabasePopulator populator = new ResourceDatabasePopulator();
        populator.addScript(resourceLoader.getResource("classpath:/org/springframework/batch/core/schema-hsqldb.sql"));
        populator.setContinueOnError(true);
        DatabasePopulatorUtils.execute(populator , dataSource);
    }
    
    private void initializeMongodb() throws FileNotFoundException, IOException {
        this.saveResource(userResource, "USER");
        
        this.saveDocumentList(membersResource, "MEMBER");
    }
    
    private void saveResource(Resource resource, String collectionName) {
        String resourceJson = this.asString(resource);
        Document resourceDocument = Document.parse(resourceJson);
        this.mongoTemplate.save(resourceDocument, collectionName);
    }
    
    private void saveDocumentList(Resource resource, String collectionName) {
        String resourceJson = this.asString(resource);
        Document resourceDocument = Document.parse("{ \"list\":" + resourceJson + "}");
        List<Document> documentList = resourceDocument.get("list", List.class);
        documentList.forEach(document -> this.mongoTemplate.save(document, collectionName));
    }
    
    private String asString(Resource resource) {
        try (Reader reader = new InputStreamReader(resource.getInputStream(), StandardCharsets.UTF_8)) {
            return FileCopyUtils.copyToString(reader);
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }
    
//    @Bean(destroyMethod="close")
//    public DataSource dataSource() {
//        BasicDataSource dataSource = new BasicDataSource();
//        dataSource.setDriverClassName(environment.getProperty("batch.jdbc.driver"));
//        dataSource.setUrl(environment.getProperty("batch.jdbc.url"));
//        dataSource.setUsername(environment.getProperty("batch.jdbc.user"));
//        dataSource.setPassword(environment.getProperty("batch.jdbc.password"));
//        return dataSource;
//    }
}

ActiveMQ

只需更改application-test.yml中的brokerUrl为vm://embedded即可
spring:
   activemq:
      broker-url: vm://embedded?broker.persistent=false,useShutdownHook=false
      in-memory: true
      non-blocking-redelivery: true
      #packages:
        #trust-all: false
        #trusted: com.memorynotfound
      pool:
        block-if-full: true
        block-if-full-timeout: -1
        create-connection-on-startup: true
        enabled: false
        expiry-timeout: 0
        idle-timeout: 30000
        max-connections: 1
        maximum-active-session-per-connection: 500
        reconnect-on-exception: true
        time-between-expiration-check: -1
        use-anonymous-producers: true
        user: admin
        #password: ENC(hWJHuMyhydTqyF32neasTw==)
        password: admin

关系型数据库

将在application-test.yml中的数据库信息删除,同时在pom.xml中添加jar包依赖,这边是采用HSQL数据库
    <profile>
        <id>test</id>
        <dependencies>
            <dependency>
                <groupId>org.hsqldb</groupId>
                <artifactId>hsqldb</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
        <activation>
            <activeByDefault>false</activeByDefault>
        </activation>
    </profile>

非SPRING BOOT/SPRING的纯JDK环境可参考
https://github.com/yandex-qatools/embedded-services

https://github.com/flapdoodle-oss/de.flapdoodle.embed.mongo

https://github.com/jonyfs/spring-boot-data-embedded-mongodb/blob/master/src/main/java/br/com/jonyfs/spring/boot/data/embedded/mongodb/config/MongoConfig.java

ActiveMQ:
https://memorynotfound.com/spring-boot-embedded-activemq-configuration-example/

posted @ 2020-02-07 10:28 paulwong 阅读(669) | 评论 (0)编辑 收藏

配置SPRING BATCH中的JUNIT TEST

关键是JobLauncherTestUtils的配置:

    @Configuration
    public class BatchTestConfiguration {
        
        
        @Bean
        public JobLauncherTestUtils stoppedReportJobLauncherTestUtils(
                JobLauncher stoppedReportJobLauncher
        ) {
            return new JobLauncherTestUtils() {
                
                @Autowired
                public void setJobLauncher(JobLauncher stoppedReportJobLauncher) {
                    super.setJobLauncher(stoppedReportJobLauncher);
                }

                @Autowired
                public void setJob(Job stoppedReportJob) {
                    super.setJob(stoppedReportJob);
                }
                
            };
        }
    }


posted @ 2020-02-03 16:47 paulwong 阅读(783) | 评论 (0)编辑 收藏

Transform RemoteChunk to remote with json format in Spring Batch

Spring Batch Remote Chunk模式下,远程执行JOB时,传输的对象是ChunkRequest/ChunkResponse,无法转成JSON格式传输。

注意此处使用的是SPRING JACKSON,而不是JACKSON。一般是在SPRING INTEGRATIONA框架下转的。

需要自定义Transformer:

JsonToChunkRequestTransformer.java
package com.frandorado.springbatchawsintegrationslave.transformer;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.stream.IntStream;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.integration.chunk.ChunkRequest;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.aws.support.AwsHeaders;
import org.springframework.integration.json.JsonToObjectTransformer;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.fasterxml.jackson.databind.ObjectMapper;

@Component
public class JsonToChunkRequestTransformer extends JsonToObjectTransformer {
    
    private static final String MESSAGE_GROUP_ID_HEADER = "message-group-id";
    
    @Autowired
    AmazonSQSAsync amazonSQSAsync;
    
    @Override
    protected Object doTransform(Message<?> message) throws Exception {
        // ACK
        ack(message);
        
        return this.getMessageBuilderFactory().withPayload(buildChunkRequest(message)).setHeader(MESSAGE_GROUP_ID_HEADER, "unique").build();
    }
    
    private ChunkRequest buildChunkRequest(Message<?> message) throws IOException {
        Map map = new ObjectMapper().readValue(message.getPayload().toString(), Map.class);
        Map stepContributionMap = (Map) map.get("stepContribution");
        Map exitStatusMap = (Map) stepContributionMap.get("exitStatus");
        
        StepContribution stepContribution = new StepContribution(new StepExecution("null", null));
        ExitStatus exitStatus = new ExitStatus((String) exitStatusMap.get("exitCode"), (String) exitStatusMap.get("exitDescription"));
        
        IntStream.range(0, (Integer) stepContributionMap.get("readCount")).forEach(e -> stepContribution.incrementReadCount());
        stepContribution.incrementWriteCount((Integer) stepContributionMap.get("writeCount"));
        stepContribution.incrementFilterCount((Integer) stepContributionMap.get("filterCount"));
        stepContribution.incrementReadSkipCount((Integer) stepContributionMap.get("readSkipCount"));
        IntStream.range(0, (Integer) stepContributionMap.get("writeSkipCount")).forEach(e -> stepContribution.incrementWriteSkipCount());
        IntStream.range(0, (Integer) stepContributionMap.get("processSkipCount"))
                .forEach(e -> stepContribution.incrementProcessSkipCount());
        stepContribution.setExitStatus(exitStatus);
        
        return new ChunkRequest((Integer) map.get("sequence"), (Collection) map.get("items"), (Integer) map.get("jobId"), stepContribution);
    }
    
    private void ack(Message<?> message) {
        String receiptHandle = message.getHeaders().get(AwsHeaders.RECEIPT_HANDLE, String.class);
        String queue = message.getHeaders().get(AwsHeaders.QUEUE, String.class);
        String queueUrl = amazonSQSAsync.getQueueUrl(queue).getQueueUrl();
        
        amazonSQSAsync.deleteMessage(queueUrl, receiptHandle);
    }
}


JsonToChunkResponseTransformer.java
package com.frandorado.springbatchawsintegrationmaster.transformer;

import java.io.IOException;
import java.util.Map;

import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.integration.chunk.ChunkResponse;
import org.springframework.integration.json.JsonToObjectTransformer;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.databind.ObjectMapper;

@Component
public class JsonToChunkResponseTransformer extends JsonToObjectTransformer {
    
    @Override
    protected Object doTransform(Message<?> message) throws Exception {
        return buildChunkResponse(message);
    }
    
    private ChunkResponse buildChunkResponse(Message<?> message) throws IOException {
        Map map = new ObjectMapper().readValue(message.getPayload().toString(), Map.class);
        
        Integer jobId = (Integer) map.get("jobId");
        Integer sequence = (Integer) map.get("sequence");
        String messageContent = (String) map.get("message");
        Boolean status = (Boolean) map.get("successful");
        
        StepContribution stepContribution = new StepContribution(new StepExecution("-", null));
        
        return new ChunkResponse(status, sequence, Long.valueOf(jobId), stepContribution, messageContent);
    }
}


还有一种方式,就是如果第三类不支持转JSON,即代码里没有JACKSON的注解,可以采用MIXIN的方式:

StepExecutionJacksonMixIn.java
package org.springframework.cloud.dataflow.rest.client.support;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

import org.springframework.batch.core.StepExecution;

/**
 * Jackson MixIn for {
@link StepExecution} de-serialization.
 *
 * 
@author Gunnar Hillert
 * 
@since 1.0
 
*/
@JsonIgnoreProperties({ "jobExecution", "jobParameters", "jobExecutionId", "skipCount", "summary" })
public abstract class StepExecutionJacksonMixIn {

    @JsonCreator
    StepExecutionJacksonMixIn(@JsonProperty("stepName") String stepName) {
    }

}

在配置文件中注册才能使用:
JacksonConfiguration.java
import java.util.Locale;
import java.util.TimeZone;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.boot.autoconfigure.jackson.Jackson2ObjectMapperBuilderCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.support.json.Jackson2JsonObjectMapper;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.novacredit.bmb.batchmonitor.springbatch.common.batch.jackson.ISO8601DateFormatWithMilliSeconds;
import com.novacredit.bmb.batchmonitor.springbatch.common.batch.jackson.mixin.ExecutionContextJacksonMixIn;
import com.novacredit.bmb.batchmonitor.springbatch.common.batch.jackson.mixin.ExitStatusJacksonMixIn;
import com.novacredit.bmb.batchmonitor.springbatch.common.batch.jackson.mixin.JobExecutionJacksonMixIn;
import com.novacredit.bmb.batchmonitor.springbatch.common.batch.jackson.mixin.JobInstanceJacksonMixIn;
import com.novacredit.bmb.batchmonitor.springbatch.common.batch.jackson.mixin.JobParameterJacksonMixIn;
import com.novacredit.bmb.batchmonitor.springbatch.common.batch.jackson.mixin.JobParametersJacksonMixIn;
import com.novacredit.bmb.batchmonitor.springbatch.common.batch.jackson.mixin.StepExecutionJacksonMixIn;

@Configuration
public class JacksonConfiguration {

    @Bean
    public Jackson2JsonObjectMapper jackson2JsonObjectMapper(ObjectMapper objectMapper) {
        return new Jackson2JsonObjectMapper(objectMapper);
    }
    
    @Bean
    public Jackson2ObjectMapperBuilderCustomizer dataflowObjectMapperBuilderCustomizer() {
        return (builder) -> {
            builder.dateFormat(new ISO8601DateFormatWithMilliSeconds(TimeZone.getDefault(), Locale.getDefault(), true));
            // apply SCDF Batch Mixins to
            
// ignore the JobExecution in StepExecution to prevent infinite loop.
            
// https://github.com/spring-projects/spring-hateoas/issues/333
            builder.mixIn(StepExecution.class, StepExecutionJacksonMixIn.class);
            builder.mixIn(ExecutionContext.class, ExecutionContextJacksonMixIn.class);
            builder.mixIn(JobExecution.class, JobExecutionJacksonMixIn.class);
            builder.mixIn(JobParameters.class, JobParametersJacksonMixIn.class);
            builder.mixIn(JobParameter.class, JobParameterJacksonMixIn.class);
            builder.mixIn(JobInstance.class, JobInstanceJacksonMixIn.class);
//            builder.mixIn(StepExecutionHistory.class, StepExecutionHistoryJacksonMixIn.class);
            builder.mixIn(ExecutionContext.class, ExecutionContextJacksonMixIn.class);
            builder.mixIn(ExitStatus.class, ExitStatusJacksonMixIn.class);
//            objectMapper.setDateFormat(new ISO8601DateFormatWithMilliSeconds());
            builder.modules(new JavaTimeModule(), new Jdk8Module());
        };
    }
}

    @Bean
    public IntegrationFlow flow4Contribution(
            ConnectionFactory connectionFactory, 
            JobProperties jobProperties,
            Jackson2JsonObjectMapper jackson2JsonObjectMapper
    ) {
        return IntegrationFlows
                    .from(request4ContributionMaster())
                    .enrichHeaders(headerEnricherConfigurer())
                    .transform(Transformers.toJson(jackson2JsonObjectMapper))
                    .handle(jmsOutboundGateway4Contribution(connectionFactory, jobProperties))
                    .transform(Transformers.fromJson(StepExecution.class, jackson2JsonObjectMapper))
                    .channel(replies4ContributionMaster(null))
                    .get();
    }


https://github.com/spring-cloud/spring-cloud-dataflow/tree/master/spring-cloud-dataflow-rest-client/src/main/java/org/springframework/cloud/dataflow/rest/client/support

https://frandorado.github.io/spring/2019/07/29/spring-batch-aws-series-introduction.html

https://github.com/frandorado/spring-projects/tree/master/spring-batch-aws-integration/spring-batch-aws-integration-master/src/main/java/com/frandorado/springbatchawsintegrationmaster/transformer


https://github.com/frandorado/spring-projects/tree/master/spring-batch-aws-integration/spring-batch-aws-integration-slave/src/main/java/com/frandorado/springbatchawsintegrationslave/transformer

posted @ 2020-01-21 16:44 paulwong 阅读(575) | 评论 (0)编辑 收藏

仅列出标题
共112页: First 上一页 16 17 18 19 20 21 22 23 24 下一页 Last