-
Notifications
You must be signed in to change notification settings - Fork 8
Flow Config(task.xml)
成文 edited this page Mar 25, 2022
·
18 revisions
<config>
<!--- pipe configure -->
<dataflow alias="test(实例别名,系统默认使用文件夹名,此处可改变规则)">
<TransParam>
<!--- pipe control parameters -->
<param>
<name>logLevel</name>
<value>0</value>
<!--- 日志级别,默认0输出所有info日志,1输出统计类info日志 2 关闭 -->
</param>
<param>
<name>failFreq</name>
<value>5</value>
<!--- 实例最大运行异常频率(每秒异常次数),超过该频率将断开管道 -->
</param>
<param>
<name>maxFailTime</name>
<value>100</value>
<!--- 实例最大运行错误次数,超过该错误次数将断开管道 -->
</param>
<param>
<name>priority</name>
<value>9</value>
<!--- 运行优先级,数字越高优先级越高,默认9,资源不足时优先级低的任务自动不断缩小管道大小,searcher端自动对实例服务降级 -->
</param>
<param>
<name>fullCron</name>
<value>全量表达式(0 5 23 2 1 ?)</value>
<!--- 全量任务不配置则默认启动一个无效全量任务占位。 -->
</param>
<param>
<name>deltaCron</name>
<value>增量表达式(0 1/5 * * * ?)</value>
<!--- 增量任务不配置则默认启动一个无效增量任务占位。 -->
</param>
<param>
<name>writeTo</name>
<value>写入位置</value>
<!-- resource中的资源名称(当使用time写机制支持多源均衡写入,此时可以配置多个资源使用","分隔)-->
</param>
<param>
<name>referenceInstance</name>
<value>参照实例名称</value>
<!--- 配置后任务由参照任务控制,该功能可用于控制多管道数据写入同端 -->
</param>
<param>
<name>virtualPipe</name>
<value>[true|false] 默认false</value>
<!--- true表示是虚管道,将只控制写位置,不建立流管道。可用于解决多管道流入同端实现流末端同步控制,
需要nextJob参数配合才能实现该功能,配合referenceInstance参数可以控制多任务写入统一位置。-->
</param>
<param>
<name>nextJob</name>
<value>自定义关联job(例:instance1,instance2)</value>
<!--- 控制其运行后启动job列表,DAG通过该功能实现。 -->
</param>
<param>
<name>async</name>
<value>true/false<value>
<!--- 默认false,即nextJob任务同步执行,按顺序依次执行,true则nextJob并发执行。 -->
</param>
<!--- pipe start parameters -->
<param>
<name>readPageSize</name>
<value>10000</value>
<!--- 读取管道每次抽取的数据量 -->
</param>
<param>
<name>readFrom</name>
<value>读取位置</value>
</param>
<param>
<name>readerPoolShareAlias</name>
<value>true/false</value>
<!--- 当多个任务对源进行操作时,建立的链接是否通过alias别名进行连接池共享,默认false -->
</param>
<param>
<name>customReader</name>
<value>org.elasticflow.reader.handler.example</value>
<!--- 自定义readFrom数据读取处理器 -->
</param>
<!--- pipe end parameters -->
<param>
<name>writeMechanism</name>
<value>写机制(ab/time/norm)</value>
<!--- ab机制主备切换模式只维持一个线上实例,time机制依照时间间隔生成新实例,norm机制单一实例 -->
</param>
<param>
<name>keepNums</name>
<value>30d(可选值d天,m月份)</value>
<!--- writeMechanism为time时,参数可配置数据保留间隔,默认30d即按天存储30个实例,30m即按月处理 -->
</param>
<param>
<name>writeType</name>
<value>[full|increment],控制单条记录写入方式,覆盖或(增量)更新</value>
</param>
<param>
<name>writerPoolShareAlias</name>
<value>true/false(跨alias别名进行实例连接池共享,默认false)</value>
</param>
<param>
<name>customWriter</name>
<value>自定义writeTo数据写入处理器</value>
</param>
<param>
<name>multiThread</name>
<value>[true|false] 默认false,不启用多线程</value>
</param>
<!--- pipe search service parameters -->
<param>
<name>searchFrom</name>
<value>搜索数据位置(可选)</value>
</param>
<param>
<name>searcherShareAlias</name>
<value>[true|false] (跨alias别名进行实例连接池共享,默认true)</value>
</param>
<param>
<name>customSearcher</name>
<value>自定义searcher数据处理器</value>
</param>
</TransParam>
<ReadParam>
<param>
<name>keyField</name>
<value>id</value>
<!--- 扫描主键字段,使用唯一值字段 -->
</param>
<param>
<name>L2seqs</name>
<value>1,2,3</value>
<!--- 二级seq列表,可以轮流替换dataScanDSL中的#{seq},生成多个L2seq子任务 -->
</param>
<param>
<name>scanField</name>
<value>update_time</value>
<!--- 增量控制字段,通常使用更新时间 -->
</param>
<param>
<name>customParams</name>
<value>{json}</value>
<!--- 用户自定义json参数,可用于扩展plugin -->
</param>
<param>
<name>dataScanDSL</name>
<value><![CDATA[ select * from xxx... where id>#{page_start} and id<=#{page_end} and update_time>=#{start_time} ]]></value>
<!--- 数据扫描运行内容 -->
</param>
<param>
<name>pageScanDSL</name>
<value>写法类似dataScanDSL区域配置</value>
<!--- 定义分页pageScan,其分页依据为scanField中定义的字段的值 -->
</param>
<param>
<name>scanFieldType</name>
<value>time:second(默认)</value>
<!--- (data|time):(data配置y-m-d格式,time配置second|millisecond),扫描字段类型时间戳还是日期,进行配置 -->
</param>
<param>
<name>handler</name>
<value>数据扫描辅助自定义类</value>
<!--- 需要自行更改reader端 -->
</param>
<fields>//read fields defined
</fields>
</ReadParam>
<ComputeParam>
<param>
<name>computeModel</name>
<value>[flow|batch]</value>
<!--- (flow模式将自动在建立一个reader指向writer同样位置,单条处理方式) -->
</param>
<param>
<name>api</name>
<value>http://121.111.111.11:80/predict,http://121.111.111.12:80/predict</value>
<!--- 提供计算服务的Rest地址,配置后训练配置项都无效 -->
</param>
<param>
<name>apiRequestMaxDatas</name>
<value>60</value>
<!--- 定义request post中的最大数据条数 -->
</param>
<param>
<name>apiRequest</name>
<value><![CDATA[
{"data":{"usefieldname":"field","handler":"处理方法(java.lang.String)","type":"list/single"}}
]]></value>
<!--- 定义request post的json数据,仅支持POST/JSON类型API,且接口必须支持batch提交 -->
</param>
<param>
<name>apiResponse</name>
<value><![CDATA[
{"dataField":"data","type":"list"}
]]></value>
<!--- 定义Response数据形式,数据必须是列表形式 -->
<!--- data为reponse json中的存储数据的字段,内容必须json array形式 -->
<!--- type list表示内部数据是数组需要拆分形成多条记录-->
</param>
<param>
<name>customParams</name>
<value>{json}</value>
<!--- 用户自定义json参数,可用于扩展plugin -->
</param>
<param>
<name>keyField</name>
<value>id</value>
<!--- 扫描主键字段,使用唯一值字段 -->
</param>
<param>
<name>scanField</name>
<value>update_time</value>
<!--- 增量控制字段,通常使用更新时间 -->
</param>
<!--- start 用于训练(train)模型时需要的配置项 -->
<param>
<name>features</name>
<value>feature1,feature2</value>
<!--- 特征字段名使用”,“分隔 -->
</param>
<param>
<name>value</name>
<value>label</value>
<!--- 标签字段名使用”,“分隔 -->
</param>
<param>
<name>algorithm</name>
<value>LogisticRegression</value>
<!--- 算法名称 -->
</param>
<param>
<name>learn_rate</name>
<value>0.5</value>
<!--- 学习率 -->
</param>
<param>
<name>threshold</name>
<value>0.001</value>
<!--- 收敛值 -->
</param>
//定义阶段处理方法
<param>
<name>preprocessing</name>
<value>预处理函数</value>
</param>
<param>
<name>postprocessing</name>
<value>后处理处理函数</value>
</param>
<!--- end 用于训练(train)模型时需要的配置项 -->
<param>
<name>stage</name>
<value>train/test/predict</value>
<!--- 定义使用模式 -->
</param>
<fields>//comupte fields defined
</fields>
</ComputeParam>
<WriteParam>
<param>
<name>writeKey</name>
<value>依赖键</value>
</param>
<param>
<name>keyType</name>
<value>类型(scan/unique)</value>
</param>
<param>
<name>storageStructure</name>
<value> <![CDATA[ 结构描述json ]]></value>
</param>
<param>
<name>dslParse</name>
<value>normal/condition</value>
</param>
<param>
<name>customParams</name>
<value>
{"attribute":"
MERGE (n:#{subject} {_id: #{subject_id}}) SET n.#{predicate} = '#{object}', n._id= #{subject_id}
",
"relation":"
MERGE (n:#{subject} {_id: #{subject_id}}) SET n._id = #{subject_id}
MERGE (m:#{object} {_id: #{object_id}}) SET m._id = #{object_id}
MERGE (n)-[r:#{predicate}]->(m)
"
}
</value>
<!--- 用户自定义json参数,可用于扩展plugin -->
</param>
<fields>//write fields defined
<field
name="对应reader字段域名称"
handler="进入管道前域处理器,可跨字段域处理(如:org.elasticflow.writerUnit.handler,可为空)"
router="true/false(是否为路由域,可为空)"
indexed="是否索引(true/false)"
stored="是否存储(true/false)"
indextype="数据类型(根据写入位置自由定义,可以支持按值条件运行,写法参考neo4j流实现部分)"
separator="数据分隔符,可由字符串转数组(可为空)"
analyzer="分析器(可为空)"
alias="存储名称,(不设置则默认为name值,可不配置,但不能空)"
defaultvalue="域默认填充值"
dsl="域额外扩展信息,{json字符串},例如{'dims':'100'}(可为空)"
customParams="用户自定义json参数,可用于扩展plugin,例如{'baseurl':'http://xxx.xxx.xx'}(可为空)"
paramtype="写入目标端时的处理器(org.elasticflow.field.handler.LongRangeType),searcher、writer端都可使用,默认java.lang.String处理,可不写" />
<field name="geodata" stored="false" indexed="true" indextype="geo_point" analyzer="" alias="geo" paramtype="java.lang.String" separator="," />
<field name="update_time" indexed="true" stored="false" indextype="long" alias="update_time" paramtype="org.elasticflow.field.handler.LongRangeType" />
<field>
<name>geodata</name>
<stored>false</stored>
<dsl>{}</dsl>
...
</field>
</fields>
</WriteParam>
<SearchParam>
//定义搜索参数及默认值
<param>
<name>定义查询字段</name>
<includeLower>是否包含下界(true/false),用于范围查询字段</includeLower>
<includeUpper>是否包含上界(true/false),用于范围查询字段</includeUpper>
<paramtype>传入类型限制</paramtype>
<fields>(存储字段)paramname_1,...,paramname_n</fields>
</param>
<param>
...
</param>
</SearchParam>
</dataflow>
</config>
ElasticFlow