Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature]:从spu层面做mpc, 数据是如何传输的? #902

Open
notinghere opened this issue Nov 5, 2024 · 25 comments
Open

[Feature]:从spu层面做mpc, 数据是如何传输的? #902

notinghere opened this issue Nov 5, 2024 · 25 comments

Comments

@notinghere
Copy link

Feature Request Type

Usability

Have you searched existing issues?

Yes

Is your feature request related to a problem?

no

Describe features you want to add to SPU

一 步骤

  1. 在 sml/metrics/regression/regression_test.py 代码中,发现实际使用了模拟器 sim = spsim.Simulator.simple(
    3, spu_pb2.ProtocolKind.ABY3, spu_pb2.FieldType.FM64)。
  2. 在模拟器中 spu/utils/simulation.py 中,函数实际调用的时候。会将双方数据都传入,然后做 params = [
    self.io.make_shares(x, spu_pb2.Visibility.VIS_SECRET) for x in flat_args]。
  3. 模拟器执行job的时候,双方可以直接将双方makeshare以后的数据直接 set_var 进行计算。
def __call__(self, executable, *flat_args):
        flat_args = [np.array(jnp.array(x)) for x in flat_args]
        params = [
            self.io.make_shares(x, spu_pb2.Visibility.VIS_SECRET) for x in flat_args
        ]

        lctx_desc = libspu.link.Desc()
        for rank in range(self.wsize):
            lctx_desc.add_party(f"id_{rank}", f"thread_{rank}")

        def wrapper(rank):
            lctx = libspu.link.create_mem(lctx_desc, rank)
            rank_config = spu_pb2.RuntimeConfig()
            rank_config.CopyFrom(self.rt_config)
            if rank != 0:
                # rank_config.enable_pphlo_trace = False
                rank_config.enable_action_trace = False
                rank_config.enable_hal_profile = False
                rank_config.enable_pphlo_profile = False
            rt = spu_api.Runtime(lctx, rank_config)

            # do infeed.
            for idx, param in enumerate(params):
                rt.set_var(executable.input_names[idx], param[rank])

            # run
            rt.run(executable)

            # do outfeed
            return [rt.get_var(name) for name in executable.output_names]

        jobs = [
            PropagatingThread(target=wrapper, args=(rank,))
            for rank in range(self.wsize)
        ]

        [job.start() for job in jobs]
        parties = [job.join() for job in jobs]

        outputs = zip(*parties)
        return [self.io.reconstruct(out) for out in outputs]

二 问题

  1. 假设2方进行mpc。计算的函数为:
def array_sum(array1,array2):
    return jnp.add(array1,array2)

双方数据为 data_alice 和 data_bob, 双方需要通过网络进行交互。此时应该如何传入数据呢?

Describe features you want to add to SPU

  • 多方传入数据和方法,可以直接计算结果。
@tpppppub
Copy link
Member

tpppppub commented Nov 5, 2024

make_shares 和 set_var 是 spu io 的 api。在生产环境请使用 secretflow 参考 secretflow 的用法,secretflow 会屏蔽这些细节。

@notinghere
Copy link
Author

make_shares 和 set_var 是 spu io 的 api。在生产环境请使用 secretflow 参考 secretflow 的用法,secretflow 会屏蔽这些细节。

如果仅通过spu 来做,这里的通信应该如何来做?有没有例子可参考?

@tpppppub
Copy link
Member

tpppppub commented Nov 6, 2024

请参考 examples 下的例子,SPU distributed 也做了一个类 ray 的封装,用户不用直接调这些 io api

@notinghere
Copy link
Author

请参考 examples 下的例子,SPU distributed 也做了一个类 ray 的封装,用户不用直接调这些 io api

我执行了一下这个例子

# Start nodes.
# > bazel run -c opt //examples/python/utils:nodectl -- up
#
# Run this example script.
# > bazel run -c opt //examples/python:millionaire

第二步骤的时候会报错

···
INFO: Running command line: bazel-bin/examples/python/millionaire
Traceback (most recent call last):
File "/home/evan/.cache/bazel/_bazel_evan/4a656ca15a246bdf6bb10b4bec038b57/execroot/spulib/bazel-out/k8-opt/bin/examples/python/millionaire.runfiles/spulib/examples/python/millionaire.py", line 40, in
ppd.init(conf["nodes"], conf["devices"])
File "/home/evan/.cache/bazel/_bazel_evan/4a656ca15a246bdf6bb10b4bec038b57/execroot/spulib/bazel-out/k8-opt/bin/examples/python/millionaire.runfiles/spulib/spu/utils/distributed_impl.py", line 1171, in init
_CONTEXT = HostContext(nodes_def, devices_def)
File "/home/evan/.cache/bazel/_bazel_evan/4a656ca15a246bdf6bb10b4bec038b57/execroot/spulib/bazel-out/k8-opt/bin/examples/python/millionaire.runfiles/spulib/spu/utils/distributed_impl.py", line 1091, in init
self.devices[name] = SPU(
File "/home/evan/.cache/bazel/_bazel_evan/4a656ca15a246bdf6bb10b4bec038b57/execroot/spulib/bazel-out/k8-opt/bin/examples/python/millionaire.runfiles/spulib/spu/utils/distributed_impl.py", line 1006, in init
results = [future.result() for future in futures]
File "/home/evan/.cache/bazel/_bazel_evan/4a656ca15a246bdf6bb10b4bec038b57/execroot/spulib/bazel-out/k8-opt/bin/examples/python/millionaire.runfiles/spulib/spu/utils/distributed_impl.py", line 1006, in
results = [future.result() for future in futures]
File "/home/evan/miniconda3/envs/venv3.10/lib/python3.10/concurrent/futures/_base.py", line 451, in result
return self.__get_result()
File "/home/evan/miniconda3/envs/venv3.10/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/home/evan/miniconda3/envs/venv3.10/lib/python3.10/concurrent/futures/thread.py", line 58, in run
result = self.fn(*self.args, **self.kwargs)
File "/home/evan/.cache/bazel/_bazel_evan/4a656ca15a246bdf6bb10b4bec038b57/execroot/spulib/bazel-out/k8-opt/bin/examples/python/millionaire.runfiles/spulib/spu/utils/distributed_impl.py", line 247, in run
return self._call(self._stub.Run, fn, *args, **kwargs)
File "/home/evan/.cache/bazel/_bazel_evan/4a656ca15a246bdf6bb10b4bec038b57/execroot/spulib/bazel-out/k8-opt/bin/examples/python/millionaire.runfiles/spulib/spu/utils/distributed_impl.py", line 236, in _call
rsp_data = rebuild_messages(rsp_itr.data for rsp_itr in rsp_gen)
File "/home/evan/.cache/bazel/_bazel_evan/4a656ca15a246bdf6bb10b4bec038b57/execroot/spulib/bazel-out/k8-opt/bin/examples/python/millionaire.runfiles/spulib/spu/utils/distributed_impl.py", line 214, in rebuild_messages
return b''.join([msg for msg in msgs])
File "/home/evan/.cache/bazel/_bazel_evan/4a656ca15a246bdf6bb10b4bec038b57/execroot/spulib/bazel-out/k8-opt/bin/examples/python/millionaire.runfiles/spulib/spu/utils/distributed_impl.py", line 214, in
return b''.join([msg for msg in msgs])
File "/home/evan/.cache/bazel/_bazel_evan/4a656ca15a246bdf6bb10b4bec038b57/execroot/spulib/bazel-out/k8-opt/bin/examples/python/millionaire.runfiles/spulib/spu/utils/distributed_impl.py", line 236, in
rsp_data = rebuild_messages(rsp_itr.data for rsp_itr in rsp_gen)
File "/home/evan/miniconda3/envs/venv3.10/lib/python3.10/site-packages/grpc/_channel.py", line 543, in next
return self._next()
File "/home/evan/miniconda3/envs/venv3.10/lib/python3.10/site-packages/grpc/_channel.py", line 969, in _next
raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.UNAVAILABLE
details = "failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:61920: Failed to connect to remote host: connect: Connection refused (111)"
debug_error_string = "UNKNOWN:Error received from peer {grpc_message:"failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:61920: Failed to connect to remote host: connect: Connection refused (111)", grpc_status:14, created_time:"2024-11-06T16:45:23.236415463+08:00"}"

···

@tpppppub
Copy link
Member

tpppppub commented Nov 6, 2024

运行第二行命令需要保持第一行命令一直运行

@notinghere
Copy link
Author

notinghere commented Nov 6, 2024

运行第二行命令需要保持第一行命令一直运行
好了,有个地方我搞错了。谢谢老师

@notinghere
Copy link
Author

运行第二行命令需要保持第一行命令一直运行

你好,老师,我看用例里面,都是单方发起,数据也都是单方传入,有没有多方传入参数的用例?

@tpppppub
Copy link
Member

tpppppub commented Nov 8, 2024

和 secretflow 一样,用例只是一个 dirver 程序,p1 和 p2 就是在不同方执行的

@notinghere
Copy link
Author

和 secretflow 一样,用例只是一个 dirver 程序,p1 和 p2 就是在不同方执行的

以 examples/python/millionaire.py 这个为例。最后z = compare(x, y) 函数由哪一方执行,x,y 由于属于不同参与方,数据如何汇聚到一方呢?

@tpppppub
Copy link
Member

tpppppub commented Nov 8, 2024

@ppd.device 这个 decorator 已经标明了这个函数在哪个 device 执行。数据如何进入 SPU runtime 运行就是基于 make_shares、set_var 等 SPU IO 实现的,这里已经屏蔽了相应细节,用户无感。如果想了解细节请参考 SPU device 的代码。

@notinghere
Copy link
Author

@ppd.device 这个 decorator 已经标明了这个函数在哪个 device 执行。数据如何进入 SPU runtime 运行就是基于 make_shares、set_var 等 SPU IO 实现的,这里已经屏蔽了相应细节,用户无感。如果想了解细节请参考 SPU device 的代码。

  • 以这个为例 examples/python/millionaire.py
# Copyright 2021 Ant Group Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


# Start nodes.
# > bazel run -c opt //examples/python/utils:nodectl -- up
#
# Run this example script.
# > bazel run -c opt //examples/python:millionaire


import argparse
import json
import logging

import jax.numpy as jnp
import numpy as np

import spu.utils.distributed as ppd

parser = argparse.ArgumentParser(description='distributed driver.')
parser.add_argument("-c", "--config", default="examples/python/conf/3pc.json")
args = parser.parse_args()

with open(args.config, 'r') as file:
    conf = json.load(file)

print(">>>> init")
ppd.init(conf["nodes"], conf["devices"])

@ppd.device("P1")
def rand_from_alice():
    logging.info("make a private variable on P1, it's known only for P1.")
    np.random.seed()
    return np.random.randint(100, size=(4,))


@ppd.device("P2")
def rand_from_bob():
    logging.info("make a private variable on P2, it's known only for P2.")
    np.random.seed()
    return np.random.randint(100, size=(4,))


@ppd.device("SPU")
def compare(x, y):
    logging.info("compute the max of two parameter, unknown for all parties.")
    return jnp.maximum(x, y)

x = rand_from_alice()
y = rand_from_bob()

# x & y will be automatically fetch by SPU (as secret shares)
# z will be evaluated as an SPU function.
z = compare(x, y)

print(f"z({type(z)} is a device object ref, we can not access it directly.")
print(f"use ppd.get to get the object from device to this host")
print(f"x = {ppd.get(x)}")
print(f"y = {ppd.get(y)}")
print(f"z = {ppd.get(z)}")

这段代码如果是部署在两方,P1 方只需要传入P1的数据,P2方传入P2的数据。

  1. compare 函数由哪一方传入。然后由哪一方触发执行compare 函数。怎么触发?
    例如:z = compare(x, y) ,此时P1,P2 方不具备完整的x,y 数据。 如果有spu 执行,P1 方用哪种方式触发执行?

@tpppppub
Copy link
Member

tpppppub commented Nov 8, 2024

P1 和 P2 只需要启动节点,这段代码只是描述计算任务和触发计算任务的 driver

@tpppppub
Copy link
Member

tpppppub commented Nov 8, 2024

@notinghere
Copy link
Author

P1 和 P2 只需要启动节点,这段代码只是描述计算任务和触发计算任务的 driver

这个 driver 由哪一方启动?

@tpppppub
Copy link
Member

tpppppub commented Nov 8, 2024

这个例子里任何一方都可以启动

@notinghere
Copy link
Author

这个例子里任何一方都可以启动

**多控制器执行的核心思路是每个参与方都运行同一份代码,但是仅执行代码中属于自己的部分。

假如我们把代码逻辑看成一个DAG(有向无环图),DAG的每个节点表示一段属于某个参与方的代码逻辑,边表示不同节点之间的数据流转,则多控制器执行原理可以如图所示。**

我大概明白了,相当于双方都执行这个代码,然后按照约定执行各自的部分。

{
    "id": "outsourcing.3pc",
    "nodes": {
        "node:0": "127.0.0.1:61920",
        "node:1": "127.0.0.1:61921",
        "node:2": "127.0.0.1:61922",
        "node:3": "127.0.0.1:61923",
        "node:4": "127.0.0.1:61924"
    },
    "devices": {
        "SPU": {
            "kind": "SPU",
            "config": {
                "node_ids": [
                    "node:0",
                    "node:1",
                    "node:2"
                ],
                "spu_internal_addrs": [
                    "127.0.0.1:61930",
                    "127.0.0.1:61931",
                    "127.0.0.1:61932"
                ],
                "experimental_data_folder": [
                    "/tmp/spu_data_0/",
                    "/tmp/spu_data_1/",
                    "/tmp/spu_data_2/"
                ],
                "runtime_config": {
                    "protocol": "ABY3",
                    "field": "FM64",
                    "enable_pphlo_profile": true,
                    "enable_hal_profile": true
                }
            }
        },
        "P1": {
            "kind": "PYU",
            "config": {
                "node_id": "node:3"
            }
        },
        "P2": {
            "kind": "PYU",
            "config": {
                "node_id": "node:4"
            }
        }
    }
}

上面这个配置,在哪个地方增加 'self_party': '' 以后,双方就可以各自执行各自的部分了?

@tpppppub
Copy link
Member

tpppppub commented Nov 8, 2024

spu 本身没有这种能力,对安全性要求较高的生产环境请通过 secretflow 的生产模式来使用 spu,其提供多控制器模式支持 https://www.secretflow.org.cn/zh-CN/docs/secretflow/v1.3.0b0/getting_started/deployment#production

@notinghere
Copy link
Author

SPU distributed

嗯,我现在是想用 SPU distributed 测试模拟一下多方的跑法,所以,这个配置里面有没有可以配置类似sf 中self_party 的位置,这样我可以开两个终端,分别以 Alice和bob 的模式让他们跑起来?

@tpppppub
Copy link
Member

tpppppub commented Nov 8, 2024

如果只是模拟,只需要把第一步启动 nodectl 的过程改成在每一方启动 node,然后运行一个 driver 即可

@notinghere
Copy link
Author

如果只是模拟,只需要把第一步启动 nodectl 的过程改成在每一方启动 node,然后运行一个 driver 即可

老师,driver 可以分开执行吗?

@tpppppub
Copy link
Member

tpppppub commented Nov 8, 2024

P1 和 P2 只需要启动节点,这段代码只是描述计算任务和触发计算任务的 driver

前面已经提到了,driver 只是“描述计算任务和触发计算任务”,真正执行计算的是 node,没懂为什么要 driver 分开执行

@notinghere
Copy link
Author

P1 和 P2 只需要启动节点,这段代码只是描述计算任务和触发计算任务的 driver

前面已经提到了,driver 只是“描述计算任务和触发计算任务”,真正执行计算的是 node,没懂为什么要 driver 分开执行

  1. 我看到node 仅是做计算操作,属于常驻服务。
  2. driver 描述计算任务。同时也担当了数据输入和结果输出的作用。如果仅一方执行driver,怎么做数据输入和结果获取呢?

@tpppppub
Copy link
Member

tpppppub commented Nov 8, 2024

rand_from_alice 只是示例,你可以改成从 p1 本地文件读取的函数

@notinghere
Copy link
Author

rand_from_alice 只是示例,你可以改成从 p1 本地文件读取的函数

  1. 是不是意味着 driver 中的方法与机器资源相关了,alice 操作数据的时候如果需要用到一些操作库,那么alice对应的机器上就需要提前部署好。
  2. 如果bob执行driver, alice 如果需要一些输入和输出,也只能通过在json文件中配置地址,然后通过文件io操作。
  3. 如果在这个步骤以后需要做别的操作,没有执行driver的一方如何被通知到呢?

@tpppppub
Copy link
Member

tpppppub commented Nov 11, 2024

1: 是。
2&3: alice、bob 的输入输出都可以通过文件 io 操作,整个流程都写在 driver 里。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants