大大毛
的笔记
  DDM's Note
哪怕没有办法一定有说法,
就算没有鸽子一定有乌鸦,
固执无罪 梦想有价,
让他们惊讶.
posts - 14, comments - 23, trackbacks - 0, articles - 58
::
首页
:: ::
联系
:: ::
管理
Kafka资料落地至MariaDB (带Key的新增、修改和删除)
Posted on 2019-04-11 15:40
大大毛
阅读(274)
评论(0)
编辑
收藏
所属分类:
Nifi
需求:
比较前一篇文章来说,仅加多Delete的行为。例如我仅需要Status=1的资料,所以对于资料落地来讲,最合适的莫过于下面这样,Table也可以自动保持最少量的有效资料
1. 新资料Status=0,执行Insert;
2. 资料修改Status=0,执行Update;
3. 资料状态变更Status=1,执行Delete;
4. 若资料状态重新变更为0,则又会执行Insert;
思路:
按理来说,只要通过分支Route就可以将Insert/Update与Delete作业分成两条Nifi支流(看网上确实有很多这么整法的),但是用Route有一个问题处理不了,那就是资料顺序的正确性你是无法保证的。对于小数据量的场景来说,每笔Key的多次操作间隔可能会比较长,所以它不会有什么问题,但大数据量的情况下,两同相同Key值的资料走Route后被处理的顺序混乱就会造成最终资料结果的异常(比如应该是先Insert再Delete,结果却是发现资料还躺在Table中)。而大数据量在使用Kafka做为数据源时就不可避免会出现:即使业务数据量确实不大,但对于积累了好几天的数据再进行接收时,那一瞬间的数据量也会是很大的。
所以我们能做的就是动态决定执行Delete和Insert。
解决方案:
虽然与
前一篇
来说差异不大,但Nifi流程上却有很大不同,下面会详细描述为什么要这样做
Processor及其设定:
ConsumeKafkaRecord、SplitJson、Connection、EvaluateJsonPath
,
与前一章的一样,只是不同数据下解析的属性有所不同,这里不再详述。
UpdateAttribute
,
作用是从Kafka中Consume出资料(以Record的形态),这里使用Record是因为源数据就是以Record的方式存上去的 (Avro Schema)
SQL1
:这才是这次花招的关键,在这里根据STATUS自行构建SQL语句
Status=0
:构建的就是Delete From xxx Where Key1=? and Key2=? 这样的删除语句
Status!=0
:构建的就是Replace Into xxx (Key1,Key2,col3,col4) Values ( ?, ?, ?, ?) 这样的Insert or Update语句
经此Processor处理后,资料落地所需的SQL就构建好了,后续的问题就是如何去绑定参数和执行
ReplaceText
,
作用是对FlowFile进行文本替换,这里使用它来直接产生我所需的JSON内容
Search Value
:这里使用的是Default的
(?s)(^.*$),作用就是把原先的整份文件全部换掉
Replacement Value
:这里放的就是一个固定结构的JSON,可以看到里面的属性值都是使用的Attribute (它们的值来源于前面的EvaluateJsonPath从源JSON文件中的提取)
细心的朋友可以发现这里是与前一篇文章的最大不同,这里没有使用AttributeToJson去直接产生JSON文件,而使用的是更加笨拙的方式
前面的文章有提过,我们产生的Attribute以及AttributeToJson所生成JSON中各属性的顺序问题,结论是怎么搞它都不是我所想象到的顺序。但是ConvertJsonToSQL这个东东却很实在,它确确实实是按JSON中属性的顺序去生成的SQL以及参数名称(还记得参数名称sql.args.
1
.value中的这个顺序
1
么),所以问题就来了:
SQL由于必须要有Delete和Replace,所以它们的参数个数一定是不同的,而Delete压的参数又是我们的Key,所以就必须要保证ConvertJsonToSQL生成属性的顺序,这样我们才能够保证我们的两个Key一定会是sql.args.1和sql.args.2
换句话说,如果AttributesToJson若是能够保证JSON属性顺序的话,那就不用这么费劲
ConvertJsonToSQL
,
与前文一样,以Insert的方式生成SQL和绑定参数即可
UpdateAttribute
,
终于用到了它的Delete功能,作用是清除掉多余的SQL绑定参数
Delete Attributes Expression
:这里我根据Delete的条件(STATUS=0)去删除多余的SQL绑定参数
这里的写法比较死,我Hard-code删除掉大与2的其它所有参数("
*
"
是一个通配符,"
|
"是一个多条件的间隔符),感觉上还有更好的写法
至此我们就可以保证绑定参数的数量与SQL语法参数个数一致 (不一致它死给你看)
PutSQL
,
这里仍然是执行SQL,这里使用配置参数的形式让它执行我们的SQL
SQL Statement
:前面用UpdateAttribute产生的SQL1参数,它会根据STATUS=0去判断是使用DELETE还是REPLACE语法
这个属性压上后,无论SQL1是不是为空,这个组件都不会再去管FlowFile的内容(空属性时是把FlowFile的内容当成SQL去执行的)
新用户注册
刷新评论列表
只有注册用户
登录
后才能发表评论。
网站导航:
博客园
IT新闻
Chat2DB
C++博客
博问
管理
相关文章:
Nifi同步数据的几种方法
RouteOnAttribute的用法
Kafka资料落地至MariaDB (带Key的新增、修改和删除)
Kafka资料落地至MariaDB (带Key的新增、修改)
Oracle资料推送MQTT
Powered by:
BlogJava
Copyright © 大大毛
日历
<
2025年1月
>
日
一
二
三
四
五
六
29
30
31
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
1
2
3
4
5
6
7
8
公告
果然是不能想得太好。
随笔分类
(4)
VB培训(4)
文章分类
(59)
JAVA
(6)
Spring
(3)
Hibernate
Struts
(12)
NET
VB
(2)
ASP
(1)
ASP.NET
(6)
HTML
(3)
400
(2)
I4.0
Nifi
(5)
Angular
(1)
SQL
(15)
常用算法
(1)
其它
(2)
积分与排名
积分 - 59631
排名 - 874
最新评论
1. re: 手工添加MyEclipse的XML文件模板[未登录]
请问,eclipse下面有没有呢?现在想要实现eclipse的xml的模板进行配置修改,简单说,就是把新建时候的名字作为其中的一个tag;找了很久没有找到方法
--allen
2. re: 第二章 Visual Basic 基础语法
受益匪浅,多谢!
--yuleself
3. re: 数字填空
评论内容较长,点击标题查看
--去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去
4. re: Checkbox联动演示
dcdc
--dcd
5. re: 利用TN5250NF下載檔案的自動化處理
請教若密碼要動態生成,是否有辦法呢?
謝謝
--江佳桂
i am ddm