Skip to content

Flow Config(task.xml)

成文 edited this page Jul 24, 2024 · 18 revisions
<config>  
<!--- pipe configure -->
<dataflow alias="test(实例别名。注:系统默认使用文件夹名)">  
	<TransParam>
                        <!--- pipe control parameters -->
			<param>
				<name>logLevel</name>
				<value>0</value>
                                <!--- 日志级别,默认0输出所有info日志,1输出统计类info日志 2 关闭 -->
			</param>
			<param>
				<name>remark</name>
				<value>实现特征提取</value>
				<!--- 备注说明 -->
			</param> 
			<param>
				<name>failFreq</name>
				<value>5</value>
                                <!--- 实例最大运行异常频率(每秒异常次数),超过该频率将断开管道,如果错误频繁不想断开增大该值 -->
			</param>
			<param>
				<name>maxFailTime</name>
				<value>100</value>
                                <!--- 实例最大运行错误次数,超过该错误次数将断开管道 -->
			</param>
			<param>
				<name>priority</name>
				<value>9</value>
                                <!--- 运行优先级,数字越高优先级越高,默认9,资源不足时优先级低的任务自动不断缩小管道大小,searcher端自动对实例服务降级 -->
			</param>
			<param>
				<name>fullCron</name>
				<value>全量表达式(0 5 23 2 1 ?)</value>
                                <!--- 全量任务不配置则默认启动一个无效全量任务占位。 -->
			</param>
			<param>
				<name>deltaCron</name>
				<value>增量表达式(0 1/5 * * * ?)</value>
                                <!--- 增量任务不配置则默认启动一个无效增量任务占位。 -->
			</param> 
			<param>
				<name>writeTo</name>
				<value>写入位置</value>
				<type>类型(如mysql,表示该任务可支持的写端类型)</type>
                     <!-- resource中的资源名称(当使用time写机制支持多源均衡写入,此时可以配置多个资源使用","分隔)-->                   
			</param>
			<param>
				<name>referenceInstance</name>
				<value>参照实例名称</value>
                                <!--- 配置后任务由参照任务控制,该功能可用于控制多管道数据写入同端 -->
			</param>
			<param>
				<name>virtualPipe</name>
				<value>[true|false] 默认false</value>
                                <!--- true表示是虚管道,将只控制写位置,不建立流管道。可用于解决多管道流入同端实现流末端同步控制,
                     需要nextJob参数配合才能实现该功能,配合referenceInstance参数可以控制多任务写入统一位置。-->
			</param>
			<param>
				<name>nextJob</name>
				<value>自定义关联job(例:instance1,instance2)</value>
                                <!--- 控制其运行后启动job列表,DAG通过该功能实现。 -->
			</param>
                        <param>
				<name>async</name>
				<value>true/false<value>
                                <!--- 默认false,即nextJob任务同步执行,按顺序依次执行,true则nextJob并发执行。 -->
			</param>
                       <!--- pipe start parameters -->
			<param> 
				<name>readPageSize</name> 
				<value>10000</value> 
                                <!--- 读取管道每次抽取的数据量 -->
			</param>
			<param> 
				<name>readFrom</name> 
				<value>读取位置</value> 
				<type>类型(如mysql,表示该任务可支持的读端类型)</type>
			</param>
                        <param>
				<name>readerPoolShareAlias</name>
				<value>true/false</value>
                                <!--- 当多个任务对源进行操作时,建立的链接是否通过alias别名进行连接池共享,默认false -->
			</param>
			<param>
				<name>customReader</name>
				<value>org.elasticflow.reader.flow.example</value>
				<!--- 自定义reader读取处理器,可定义plugin方式载入 -->
			</param> 

                       <!--- pipe end parameters -->
                       <param>
                             <name>writeMechanism</name>
                             <value>写机制(ab/time/norm)</value>
			     <!--- ab机制主备切换模式只维持一个线上实例,time机制依照时间间隔生成新实例,norm机制单一实例 -->
                       </param>
                       <param>
                             <name>keepNums</name>
                             <value>30d(可选值d天,w周,m月份)</value>
			     <!--- writeMechanism为time时,参数可配置数据保留间隔,默认30d即按天存储每天一个实例维持30个规模,
                                  30m即按月处理,每月一个实例维持30个规模 -->
                       </param>
                       <param>
				<name>writeType</name>
				<value>[full|increment],控制单条记录写入方式,覆盖或(增量)更新</value>
		       </param>
                       <param>
				<name>writerPoolShareAlias</name>
				<value>true/false(跨alias别名进行实例连接池共享,默认false)</value>
			</param>
                       <param>
				<name>customWriter</name>
                                <value>org.elasticflow.writer.flow.example</value>
				<!--- 自定义Writer数据写入处理器,可定义plugin方式载入 --> 
			</param>
                        <param>
                             <name>multiThread</name>
                             <value>[true|false] 默认false,不启用多线程</value>
                        </param>
		 
                        <!--- pipe search service parameters -->
                         <param>
				<name>searchFrom</name>
				<value>搜索数据位置(可选)</value>
			</param>  
			<param>
				<name>searcherShareAlias</name>
				<value>[true|false] (跨alias别名进行实例连接池共享,默认true)</value>
			</param>
                        <param>
				<name>customSearcher</name>
                                <value>org.elasticflow.searcher.flow.example</value>
				<!--- 自定义searcher数据搜索器,可定义plugin方式载入 -->
			</param>  
		</TransParam>

                <ReaderParam>   
                        <param>
				<name>keyField</name>
				<value>id</value>
				<!--- 扫描主键字段,使用唯一值字段 -->
			</param>
                        <param>
				<name>L2seqs</name>
				<value>1,2,3</value>
				<!--- 二级seq列表,可以轮流替换dataScanDSL中的#{seq},生成多个L2seq子任务 -->
			</param>
			<param>
				<name>scanField</name>
				<value>update_time</value>
				<!--- 增量控制字段,通常使用更新时间 -->
			</param>
			<param>
				<name>customParams</name>
				<value>{json}</value>
				<!--- 用户自定义json参数,可用于扩展plugin -->
			</param>
			<param>
				<name>dataScanDSL</name>
				<value><![CDATA[ select * from xxx... where id>#{page_start} and id<=#{page_end} and update_time>=#{start_time} ]]></value>
				<!--- 数据扫描运行内容 -->
			</param>
			<param>
				<name>pageScanDSL</name>
				<value>写法类似dataScanDSL区域配置</value>
				<!--- 定义分页pageScan,其分页依据为scanField中定义的字段的值 -->
			</param>
			<param>
				<name>scanFieldType</name>
				<value>time:second(默认)</value>
				<!--- (data|time):(data配置y-m-d格式,time配置second|millisecond),扫描字段类型时间戳还是日期,进行配置 -->
			</param>
			<param>
				<name>handler</name>
				<value>数据读取用户自定义插件类</value>
				<dsl><!--- json格式自定义插件参数 --></dsl> 
			</param>
			<fields>//reader fields defined 
<field 
name="对应外部读取的字段域名称" 
handler="跨字跨段域处理器,可跨字段域处理(如:org.elasticflow.flow.unit.handler.ToTimestamp,可为空)" 
router="true/false(是否为路由域,可为空)"  
indexed="是否索引(true/false)" 
stored="是否存储(true/false)" 
indextype="数据类型(根据写入位置自由定义,可以支持按值条件运行,写法参考neo4j流实现部分)" 
separator="数据分隔符,可由字符串转数组(可为空)"
analyzer="分析器(可为空)" 
defaultvalue="域默认填充值"
dsl="域额外扩展信息,用于配置字段,{json字符串}(可为空)"
paramtype="字跨段域处理器(org.elasticflow.field.handler.LongRangeType),目标端值处理,可不写" 
customParams="自定义json参数,用于对自定义plugin时配置参数等,例如{'baseurl':'http://xxx.xxx.xx'}(可为空)"
/> 
			</fields>
             </ReaderParam> 

             <ComputerParam>   
			<param>
				<name>computeMode</name>
				<value>blank/rest/model</value>
				<!--- 计算模式,rest api 或者加载python模型 -->
			</param>
			<param>
				<name>api</name>
				<value>http://121.111.111.11:80/predict,http://121.111.111.12:80/predict</value>
				<!--- 提供计算服务的Rest地址,配置后训练配置项都无效 -->
			</param>
			<param>
				<name>apiRequestMaxDatas</name>
				<value>60</value>
				<!--- 定义request post中的最大数据条数 -->
			</param>
			<param>
				<name>apiRequest</name>
				<value><![CDATA[ 
				{"data":{"usefieldname":"field","handler":"处理方法(java.lang.String)","type":"list/single"}}
				]]></value>
				<!--- 定义request post的json数据,仅支持POST/JSON类型API,且接口必须支持batch提交 -->
			</param>
			<param>
				<name>apiResponse</name>
				<value><![CDATA[ 
				{"dataField":"data","type":"list"} 
				]]></value> 
				<!--- 定义Response数据形式,数据必须是列表形式 -->
				<!--- data为reponse json中的存储数据的字段,内容必须json array形式 -->
				<!--- type list表示内部数据是数组需要拆分形成多条记录-->
			</param>
			<param>
				<name>handler</name>
				<value>数据写入用户自定义插件类</value>
				<dsl><!--- json格式自定义插件参数 --></dsl> 
			</param>
			<param>
				<name>customParams</name>
				<value>{json}</value>
				<!--- 用户自定义json参数,可用于扩展plugin -->
			</param>
                        <param>
				<name>keyField</name>
				<value>id</value>
				<!--- 扫描主键字段,使用唯一值字段 -->
			</param> 
			<param>
				<name>scanField</name>
				<value>update_time</value>
				<!--- 增量控制字段,通常使用更新时间 -->
			</param> 
			//定义模型处理方法			 
			<param>
				<name>pyPath</name>
				<value>/work/nlp</value>
				<!--- 模型预测的python主入口文件夹地址 -->
			</param> 
			<fields>//Compute fields defined
				<field 
				     name="对应reader字段域名称" 
				     handler="跨字跨段域处理器,可跨字段域处理(如:org.elasticflow.flow.unit.handler.ToTimestamp,可为空)" 
				     router="true/false(是否为路由域,可为空)"  
				     indexed="是否索引(true/false)" 
				     stored="是否存储(true/false)" 
				     indextype="数据类型(根据写入位置自由定义,可以支持按值条件运行,写法参考neo4j流实现部分)" 
				     separator="数据分隔符,可由字符串转数组(可为空)"
				     analyzer="分析器(可为空)" 
				     defaultvalue="域默认填充值"
				     dsl="域额外扩展信息,用于配置字段,{json字符串}(可为空)"
				     paramtype="字段域处理器(org.elasticflow.field.handler.LongRangeType),目标端值处理,可不写" 
				     customParams="自定义json参数,用于对自定义plugin时配置参数等,例如{'baseurl':'http://xxx.xxx.xx'}(可为空)"
				/> 
			</fields>
</ComputerParam> 

<WriterParam>
   <param>
	<name>writeKey</name>
	<value>依赖键</value>
   </param>
   <param>
	<name>keyType</name>
	<value>类型(scan/unique)</value>
   </param>
   <param>
	<name>storageStructure</name>
	<value> <![CDATA[ 结构描述json ]]></value>
   </param>
  <param>
	<name>dslParse</name>
	<value>normal/condition</value>
   </param>
   <param>
	<name>customParams</name>
	<value>
{"attribute":"
MERGE (n:#{subject} {_id: #{subject_id}}) SET n.#{predicate} = '#{object}', n._id= #{subject_id}
",
"relation":" 
MERGE (n:#{subject} {_id: #{subject_id}}) SET n._id = #{subject_id} 
MERGE (m:#{object} {_id: #{object_id}}) SET m._id = #{object_id} 
MERGE (n)-[r:#{predicate}]->(m)
"
}
	</value>
<!--- 用户自定义json参数,可用于扩展plugin -->
   </param>
  <fields>//write fields defined
   <field 
name="对应reader或computer字段域名称" 
handler="跨字段域处理器,可跨字段域处理(如:org.elasticflow.flow.unit.handler.ToTimestamp,可为空)" 
router="true/false(是否为路由域,可为空)"  
indexed="是否索引(true/false)" 
stored="是否存储(true/false)" 
indextype="数据类型(根据写入位置自由定义,可以支持按值条件运行,写法参考neo4j流实现部分)" 
separator="数据分隔符,可由字符串转数组(可为空)"
analyzer="分析器(可为空)" 
alias="存储名称,(不设置则默认为name值,可不配置,但不能空)"
defaultvalue="域默认填充值"
customParams="自定义json参数,可用于扩展plugin的参数配置,例如{'baseurl':'http://xxx.xxx.xx'}(可为空)"
dsl="域额外扩展信息,{json字符串},例如{'dims':'100'}(可为空)"
paramtype="字段域处理器(org.elasticflow.field.handler.LongRangeType),默认java.lang.String处理,可不写" /> 

   <field name="geodata" stored="false" indexed="true" indextype="geo_point" analyzer="" alias="geo" paramtype="java.lang.String" separator="," />
   <field name="update_time" indexed="true" stored="false" indextype="long" alias="update_time" paramtype="org.elasticflow.field.handler.LongRangeType" /> 
   <field>
    <name>geodata</name>
    <stored>false</stored>
    <dsl>{}</dsl>
    ...
   </field>
  </fields>
</WriterParam>


<SearcherParam> 
//定义搜索参数及默认值 
<param> 
<name>定义查询字段</name>
<includeLower>是否包含下界(true/false),用于范围查询字段</includeLower>
<includeUpper>是否包含上界(true/false),用于范围查询字段</includeUpper>
<paramtype>传入类型限制</paramtype>
<fields>(存储字段)paramname_1,...,paramname_n</fields> 
</param>
	
</SearcherParam>

</dataflow>   

</config>