Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Relax reliance on SocketGroup #928

Closed
taig opened this issue Jul 20, 2023 · 1 comment · Fixed by #930
Closed

Relax reliance on SocketGroup #928

taig opened this issue Jul 20, 2023 · 1 comment · Fixed by #930

Comments

@taig
Copy link
Contributor

taig commented Jul 20, 2023

Creating a Session requires a SocketGroup, starting with Protocol.apply all the way down to BitVectorSocket.apply. However, only SocketGroup.client(to: SocketAddress[Host], options: List[SocketOption]): Resource[F, Socket[F]] is used. So instead of passing down a SocketGroup, from my understanding and field tests, a Resource[F, Socket[F]] would suffice. This would allow for a more general way to create Sessions, e.g. Session.fromSockets[F[_] : Temporal : Tracer : Console](sockets: Resource[F, Socket[F]], user: String, database: String, ...).

My motivation to supply my own Socket comes from Google Cloud Run with Google Cloud SQL, where the recommended (and most frictionless) way to connect to a Postgres database is via unix sockets. Connecting via TCP on the other hand is quite cumbersome. Google provides JDBC SocketFactories to make this integration easy, but obviously that does not work with skunk.

For now I have successfully worked my way around the skunk API with a bit of a hacky solution, so I'd prefer a cleaner way to supply my fs2 unix socket:

import cats.MonadThrow
import cats.effect.std.Console
import cats.effect.{Resource, Temporal}
import cats.syntax.all.*
import com.comcast.ip4s.{Host, IpAddress, Port, SocketAddress}
import fs2.Stream
import fs2.io.net.unixsocket.{UnixSocketAddress, UnixSockets}
import fs2.io.net.{Socket, SocketGroup, SocketOption}
import natchez.Trace
import skunk.Session
import skunk.Session.Recyclers
import skunk.net.SSLNegotiation
import skunk.net.protocol.{Describe, Parse}
import skunk.util.{Pool, Typer}

import scala.concurrent.duration.Duration

final private class UnixSocketGroup[F[_]: MonadThrow](sockets: UnixSockets[F])(instanceConnectionName: String)
    extends SocketGroup[F]:
  override def client(to: SocketAddress[Host], options: List[SocketOption]): Resource[F, Socket[F]] =
    sockets.client(UnixSocketAddress(s"/cloudsql/$instanceConnectionName/.s.PGSQL.5432"))

  override def server(address: Option[Host], port: Option[Port], options: List[SocketOption]): Stream[F, Socket[F]] =
    new UnsupportedOperationException().raiseError

  override def serverResource(
      address: Option[Host],
      port: Option[Port],
      options: List[SocketOption]
  ): Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])] =
    new UnsupportedOperationException().raiseError

object SkunkUnixsocketSession:
  def pooled[F[_]: Temporal: Trace: UnixSockets: Console](
      instanceConnectionName: String,
      user: String,
      database: String,
      password: Option[String] = none,
      max: Int,
      debug: Boolean = false,
      strategy: Typer.Strategy = Typer.Strategy.BuiltinsOnly,
      parameters: Map[String, String] = Session.DefaultConnectionParameters,
      socketOptions: List[SocketOption] = Session.DefaultSocketOptions,
      commandCache: Int = 1024,
      queryCache: Int = 1024,
      parseCache: Int = 1024,
      readTimeout: Duration = Duration.Inf
  ): Resource[F, Resource[F, Session[F]]] = pooledF[F](
    instanceConnectionName,
    user,
    database,
    password,
    max,
    debug,
    strategy,
    parameters,
    socketOptions,
    commandCache,
    queryCache,
    parseCache,
    readTimeout
  ).map(_.apply(Trace[F]))

  def pooledF[F[_]: Temporal: UnixSockets: Console](
      instanceConnectionName: String,
      user: String,
      database: String,
      password: Option[String] = none,
      max: Int,
      debug: Boolean = false,
      strategy: Typer.Strategy = Typer.Strategy.BuiltinsOnly,
      parameters: Map[String, String] = Session.DefaultConnectionParameters,
      socketOptions: List[SocketOption] = Session.DefaultSocketOptions,
      commandCache: Int = 1024,
      queryCache: Int = 1024,
      parseCache: Int = 1024,
      readTimeout: Duration = Duration.Inf
  ): Resource[F, Trace[F] => Resource[F, Session[F]]] =
    def session(socketGroup: SocketGroup[F], sslOp: Option[SSLNegotiation.Options[F]], cache: Describe.Cache[F])(using
        T: Trace[F]
    ): Resource[F, Session[F]] = for
      pc <- Resource.eval(Parse.Cache.empty[F](parseCache))
      session <- Session.fromSocketGroup[F](
        socketGroup,
        host = "ignore",
        port = 5432,
        user,
        database,
        password,
        debug,
        strategy,
        socketOptions,
        sslOp,
        parameters,
        cache,
        pc,
        readTimeout
      )
    yield session

    for
      cache <- Resource.eval(Describe.Cache.empty[F](commandCache, queryCache))
      pool <- Pool.ofF(
        (trace: Trace[F]) =>
          session(new UnixSocketGroup[F](UnixSockets[F])(instanceConnectionName), none, cache)(using trace),
        max
      )(Recyclers.full)
    yield pool

I played around with the skunk API already and made the necessary changes which was straightforward. If that is a desired patch to this project, I'd be happy to work on a proper PR.

@armanbilge
Copy link
Member

Connecting via TCP on the other hand is quite cumbersome

Note that Unix Sockets are also using TCP. I think you mean connecting with IP is quite cumbersome :)


The fact that IP and Unix sockets are handled completely differently has been frustrating. For the record, the underlying issue is in ip4s.

If we could fix that, then SocketGroup would work with both IP and Unix sockets, depending on the address passed to it.


I'm not a regular maintainer here, but the proposal to relax to a Resource[F, Socket[F]] seems like a good idea.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants