大大毛 的笔记

  DDM's Note

哪怕没有办法一定有说法,
就算没有鸽子一定有乌鸦,
固执无罪 梦想有价,
让他们惊讶.

posts - 14, comments - 23, trackbacks - 0, articles - 58
   :: 首页 ::  :: 联系 ::  :: 管理
需求:
    比较前一篇文章来说,仅加多Delete的行为。例如我仅需要Status=1的资料,所以对于资料落地来讲,最合适的莫过于下面这样,Table也可以自动保持最少量的有效资料
    1. 新资料Status=0,执行Insert;
    2. 资料修改Status=0,执行Update;
    3. 资料状态变更Status=1,执行Delete;
    4. 若资料状态重新变更为0,则又会执行Insert;

思路:
    按理来说,只要通过分支Route就可以将Insert/Update与Delete作业分成两条Nifi支流(看网上确实有很多这么整法的),但是用Route有一个问题处理不了,那就是资料顺序的正确性你是无法保证的。对于小数据量的场景来说,每笔Key的多次操作间隔可能会比较长,所以它不会有什么问题,但大数据量的情况下,两同相同Key值的资料走Route后被处理的顺序混乱就会造成最终资料结果的异常(比如应该是先Insert再Delete,结果却是发现资料还躺在Table中)。而大数据量在使用Kafka做为数据源时就不可避免会出现:即使业务数据量确实不大,但对于积累了好几天的数据再进行接收时,那一瞬间的数据量也会是很大的。
   所以我们能做的就是动态决定执行Delete和Insert。

解决方案:
虽然与前一篇来说差异不大,但Nifi流程上却有很大不同,下面会详细描述为什么要这样做



Processor及其设定:
    ConsumeKafkaRecord、SplitJson、Connection、EvaluateJsonPath与前一章的一样,只是不同数据下解析的属性有所不同,这里不再详述。

    UpdateAttribute作用是从Kafka中Consume出资料(以Record的形态),这里使用Record是因为源数据就是以Record的方式存上去的 (Avro Schema)
  • SQL1:这才是这次花招的关键,在这里根据STATUS自行构建SQL语句
    • Status=0:构建的就是Delete From xxx Where Key1=? and Key2=? 这样的删除语句
    • Status!=0:构建的就是Replace Into xxx (Key1,Key2,col3,col4) Values ( ?, ?, ?, ?) 这样的Insert or Update语句
  • 经此Processor处理后,资料落地所需的SQL就构建好了,后续的问题就是如何去绑定参数和执行

    ReplaceText作用是对FlowFile进行文本替换,这里使用它来直接产生我所需的JSON内容
  • Search Value:这里使用的是Default的(?s)(^.*$),作用就是把原先的整份文件全部换掉
  • Replacement Value:这里放的就是一个固定结构的JSON,可以看到里面的属性值都是使用的Attribute (它们的值来源于前面的EvaluateJsonPath从源JSON文件中的提取)
    • 细心的朋友可以发现这里是与前一篇文章的最大不同,这里没有使用AttributeToJson去直接产生JSON文件,而使用的是更加笨拙的方式
      • 前面的文章有提过,我们产生的Attribute以及AttributeToJson所生成JSON中各属性的顺序问题,结论是怎么搞它都不是我所想象到的顺序。但是ConvertJsonToSQL这个东东却很实在,它确确实实是按JSON中属性的顺序去生成的SQL以及参数名称(还记得参数名称sql.args.1.value中的这个顺序1么),所以问题就来了:
        • SQL由于必须要有Delete和Replace,所以它们的参数个数一定是不同的,而Delete压的参数又是我们的Key,所以就必须要保证ConvertJsonToSQL生成属性的顺序,这样我们才能够保证我们的两个Key一定会是sql.args.1和sql.args.2
        • 换句话说,如果AttributesToJson若是能够保证JSON属性顺序的话,那就不用这么费劲

        ConvertJsonToSQL与前文一样,以Insert的方式生成SQL和绑定参数即可

        UpdateAttribute终于用到了它的Delete功能,作用是清除掉多余的SQL绑定参数

    • Delete Attributes Expression:这里我根据Delete的条件(STATUS=0)去删除多余的SQL绑定参数
      • 这里的写法比较死,我Hard-code删除掉大与2的其它所有参数(" * "是一个通配符," | "是一个多条件的间隔符),感觉上还有更好的写法
    • 至此我们就可以保证绑定参数的数量与SQL语法参数个数一致 (不一致它死给你看)

          PutSQL这里仍然是执行SQL,这里使用配置参数的形式让它执行我们的SQL

      • SQL Statement:前面用UpdateAttribute产生的SQL1参数,它会根据STATUS=0去判断是使用DELETE还是REPLACE语法
        • 这个属性压上后,无论SQL1是不是为空,这个组件都不会再去管FlowFile的内容(空属性时是把FlowFile的内容当成SQL去执行的)

      只有注册用户登录后才能发表评论。


      网站导航:
       

      i am ddm