Skip to content

Latest commit

 

History

History
61 lines (46 loc) · 3.32 KB

pyalink-dataframe.md

File metadata and controls

61 lines (46 loc) · 3.32 KB

与 Dataframe 互操作

PyAlink 提供了与 pandas DataFrame 的互转操作,能够方便地使用 Python 生态中已有的强大工具。 pandas 的 DataFrame 是 Python 已有生态中表示、存储二维表的十分理想的选择,DataFrame 自身提供了一定的数据处理与可视化能力,同时又可以方便的转换为 Python 中的其他数据结构。

BatchOperator 转 Dataframe

对于各个 BatchOperator,提供了 collectToDataframe() 的成员方法,将 BatchOperator 内的 DataSet 转换为 DataFrame;直接使用 print() 方法也可以直接以 DataFrame 的形式进行打印。

source = CsvSourceBatchOp()\
    .setSchemaStr("sepal_length double, sepal_width double, petal_length double, petal_width double, category string")\
    .setFilePath("https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/iris.csv")
res = source.select(["sepal_length", "sepal_width"])
df = res.collectToDataframe()
# Operations with df
res.print()

BatchOperator 中也提供了静态方法 collectoToDataframes 可以对多个 BatchOperator 同时进行转换。 每一次单个 BatchOperator 调用 collectoToDataframe 都会导致作业图进行一次计算,多次调用时将有效率问题,更重要的是,有些算法在多次调用时可能结果不一致,例如采样操作等。 此时可以使用这个静态方法来解决这些问题。

source = CsvSourceBatchOp() \
    .setSchemaStr(
    "sepal_length double, sepal_width double, petal_length double, petal_width double, category string") \
    .setFilePath("https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/iris.csv")

split = SplitBatchOp().setFraction(0.1).linkFrom(source)
[d0, d1, d2] = collectToDataframes(source, split, split.getSideOutput(0))
print(d0, d1, d2)

Dataframe 转 BatchOperator/StreamOperator

对于用户拥有的 Dataframe 数据,PyAlink 也提供了向 BatchOperator/StreamOperator 转换的方法。 具体说来,BatchOperator/StreamOperator 中都具有静态方法 fromDataframe(df, schemaStr),其中 df 是 DataFrame 数据, schemaStr 是由数据列名和类型构成的 schema 字符串。

schema = "f_string string,f_long long,f_int int,f_double double,f_boolean boolean"
op = BatchOperator.fromDataframe(df, schema)
op.print()

op = StreamOperator.fromDataframe(df, schema)
op.print(key="op")
StreamOperator.execute()

同时,PyAlink 也提供了静态方法来进行转换:dataframeToOperator(df, schemaStr, opType),这里 dfschemaStr 参数与上文相同,opType 取值为 "batch""stream"

使用注意

  • 从 BatchOperator 向 Dataframe 转换时:如果 BatchOperator 所存储的数据中有空值 null,那么转换后的 Dataframe 将无法保证完全转换为 BatchOperator 的列类型。此时,对应的列需要手动进行处理。
  • 从 Dataframe 向 BatchOperator 转换时:如果 Dataframe 中有空值时,需要预先替换为 None,这样才能正确地进行之后的处理。
  • 当数据量大时,由于数据转换需要进行大量的 Python 与 Java 数据之间的交换,效率将会受到影响。