Skip to content

Commit

Permalink
Add ww serve command.
Browse files Browse the repository at this point in the history
  • Loading branch information
lthibault committed Nov 20, 2024
1 parent d3270cf commit 4e14373
Show file tree
Hide file tree
Showing 20 changed files with 450 additions and 407 deletions.
14 changes: 5 additions & 9 deletions cmd/internal/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/ipfs/kubo/client/rpc"
iface "github.com/ipfs/kubo/core/coreiface"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
ma "github.com/multiformats/go-multiaddr"
"github.com/tetratelabs/wazero"
"github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1"
Expand Down Expand Up @@ -77,18 +76,19 @@ func run() cli.ActionFunc {
defer wasi.Close(c.Context)

return ww.Env{
IPFS: ipfs,
Host: h,
Cmd: system.Cmd{
Path: c.Args().First(),
Args: c.Args().Tail(),
Args: c.Args().Slice(),
Env: c.StringSlice("env"),
Stdin: stdin(c),
Stdout: c.App.Writer,
Stderr: c.App.ErrWriter},
Net: system.Net{
Proto: ww.Proto,
Proto: system.Proto.Unwrap(),
Host: h,
},
FS: system.FS{
FS: system.Anchor{
Ctx: c.Context,
Host: h,
IPFS: ipfs,
Expand All @@ -97,10 +97,6 @@ func run() cli.ActionFunc {
}
}

type procServer struct {
Host host.Host
}

func newIPFSClient(c *cli.Context) (ipfs iface.CoreAPI, err error) {
var a ma.Multiaddr
if s := c.String("ipfs"); s == "local" {
Expand Down
192 changes: 71 additions & 121 deletions cmd/internal/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,20 @@ import (
"bytes"
"context"
"io"
"io/fs"
"log/slog"
"net/http"
"os"

"github.com/ipfs/kubo/client/rpc"
iface "github.com/ipfs/kubo/core/coreiface"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/event"
ma "github.com/multiformats/go-multiaddr"
"github.com/tetratelabs/wazero"
"github.com/thejerf/suture/v4"
"github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1"
"github.com/urfave/cli/v2"
ww "github.com/wetware/go"
"github.com/wetware/go/proc"
"github.com/wetware/go/system"
"github.com/wetware/go/util"
)

func Command() *cli.Command {
Expand All @@ -42,108 +40,70 @@ func Command() *cli.Command {
Usage: "enable wasm debug symbols",
},
},
Action: func(c *cli.Context) error {
ipfs, err := newIPFSClient(c)
if err != nil {
return err
}

h, err := libp2p.New()
if err != nil {
return err
}
defer h.Close()

s := suture.New("ww", suture.Spec{
EventHook: util.EventHook,
})

// Start a multicast DNS service that searches for local
// peers in the background.
s.Add(ww.MDNS{
Host: h,
Handler: ww.StorePeer{Peerstore: h.Peerstore()},
})

// Global compilation cache
cache := wazero.NewCompilationCache()
defer cache.Close(c.Context)

s.Add(ww.Server{
IPFS: ipfs,
Action: serve(),
}
}

func serve() cli.ActionFunc {
return func(c *cli.Context) error {
ipfs, err := newIPFSClient(c)
if err != nil {
return err
}

h, err := libp2p.New()
if err != nil {
return err
}
defer h.Close()

// Start a multicast DNS service that searches for local
// peers in the background.
d, err := ww.MDNS{
Host: h,
Handler: ww.StorePeer{Peerstore: h.Peerstore()},
}.New(c.Context)
if err != nil {
return err
}
defer d.Close()

r := wazero.NewRuntimeWithConfig(c.Context, wazero.NewRuntimeConfig().
WithDebugInfoEnabled(c.Bool("debug")).
WithCloseOnContextDone(true))

wasi, err := wasi_snapshot_preview1.Instantiate(c.Context, r)
if err != nil {
return err
}
defer wasi.Close(c.Context)

return ww.Env{
IPFS: ipfs,
Host: h,
Cmd: system.Cmd{
Args: c.Args().Slice(),
Env: c.StringSlice("env"),
Stdin: stdin(c),
Stdout: c.App.Writer,
Stderr: c.App.ErrWriter},
Net: system.Net{
Proto: system.Proto.Unwrap(),
Host: h,
Handler: system.HandlerFunc(func(ctx context.Context, p *proc.P) error {
slog.InfoContext(ctx, "process started",
"pid", p.String())

<-ctx.Done()
return ctx.Err()
}),
},
FS: system.Anchor{
Ctx: c.Context,
Host: h,
Env: ww.Env{
Cmd: system.Cmd{
Path: c.Args().First(),
Args: c.Args().Tail(),
Env: c.StringSlice("env"),
Stdin: stdin(c),
Stdout: c.App.Writer,
Stderr: c.App.ErrWriter,
},
Net: system.Net{
Proto: ww.Proto,
Host: h,
Handler: system.HandlerFunc(func(ctx context.Context, p *proc.P) error {
slog.InfoContext(ctx, "process started",
"peer", h.ID(),
"proc", p.String())
defer slog.WarnContext(ctx, "process stopped",
"peer", h.ID(),
"proc", p.String())
<-ctx.Done()
return ctx.Err()
}),
},
FS: system.FS{
Ctx: c.Context,
Host: h,
IPFS: ipfs,
},
},
RuntimeConfig: wazero.NewRuntimeConfig().
WithCompilationCache(cache).
WithDebugInfoEnabled(c.Bool("debug")).
WithCloseOnContextDone(true),
})

sub, err := h.EventBus().Subscribe([]any{
new(event.EvtLocalAddressesUpdated)})
if err != nil {
return err
}
defer sub.Close()

done := s.ServeBackground(c.Context)
for {
var v any
select {
case err := <-done:
return err // exit
case v = <-sub.Out():
// event received
}

// Synchronous event handler
switch ev := v.(type) {
case *event.EvtLocalAddressesUpdated:
// TODO(easy): emit to libp2p topic
slog.InfoContext(c.Context, "local addresses updated",
"peer", h.ID(),
"current", ev.Current,
"removed", ev.Removed,
"diffs", ev.Diffs)

case *event.EvtLocalProtocolsUpdated:
// TODO(easy): emit to libp2p topic
slog.InfoContext(c.Context, "local protocols updated",
"peer", h.ID(),
"added", ev.Added,
"removed", ev.Removed)
}

}
},
IPFS: ipfs,
},
}.Bind(c.Context, r)
}
}

Expand All @@ -160,29 +120,19 @@ func newIPFSClient(c *cli.Context) (ipfs iface.CoreAPI, err error) {

func stdin(c *cli.Context) io.Reader {
switch r := c.App.Reader.(type) {
case interface{ Len() int }:
if r.Len() <= 0 {
break
}

return &io.LimitedReader{
R: c.App.Reader,
N: min(int64(r.Len()), 1<<32-1), // max u32
}

case interface{ Stat() (fs.FileInfo, error) }:
case *os.File:
info, err := r.Stat()
if err != nil {
slog.Error("failed to get file info for stdin",
"reason", err)
break
} else if info.Size() <= 0 {
panic(err)
}

if info.Size() <= 0 {
break
}

return &io.LimitedReader{
R: c.App.Reader,
N: min(info.Size(), 1<<32-1), // max u32
N: 1<<32 - 1, // max u32
}
}

Expand Down
7 changes: 4 additions & 3 deletions examples/deliver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,20 @@ import (
)

func main() {
if nargs := len(os.Args); nargs < 1 {
args := os.Args[1:]
if nargs := len(args); nargs < 1 {
slog.Error("wrong number of arguments",
"want", 1,
"got", nargs,
"args", os.Args)
os.Exit(1)
}

f, err := os.Open(os.Args[0])
f, err := os.Open(args[0])
if err != nil {
slog.Error("failed to open file",
"reason", err,
"name", os.Args[0])
"name", args[0])
os.Exit(1)
}
defer f.Close()
Expand Down
Binary file modified examples/deliver/main.wasm
Binary file not shown.
23 changes: 14 additions & 9 deletions examples/echo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,25 @@ import (
"os"
)

//export echo
func echo() {
if err := _echo(os.Stdout, os.Stdin); err != nil {
panic(err)
}
}

func _echo(dst io.Writer, src io.Reader) error {
_, err := io.Copy(dst, src)
return err
}

func main() {
stdin := flag.Bool("stdin", false, "read data from stdin")
flag.Parse()

if *stdin {
if _, err := io.Copy(os.Stdout, os.Stdin); err != nil {
slog.Error("failed echo stdin",
if err := _echo(os.Stdout, os.Stdin); err != nil {
slog.Error("failed to echo stdin",
"reason", err)
os.Exit(1)
}
Expand All @@ -34,10 +46,3 @@ func main() {
}
}
}

//export echo
func echo() {
if _, err := io.Copy(os.Stdout, os.Stdin); err != nil {
panic(err)
}
}
Binary file modified examples/echo/main.wasm
Binary file not shown.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ require (
github.com/tetratelabs/wazero v1.8.1
github.com/thejerf/suture/v4 v4.0.5
github.com/urfave/cli/v2 v2.27.3
golang.org/x/sync v0.8.0
)

require (
Expand Down Expand Up @@ -166,6 +165,7 @@ require (
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/mod v0.19.0 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.18.0 // indirect
golang.org/x/tools v0.23.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions proc/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package proc

type EvtStarted struct{ *P }
Loading

0 comments on commit 4e14373

Please sign in to comment.