-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
How to use the partitioning strategy with the http client? #953
Comments
这是来自QQ邮箱的假期自动回复邮件。
您好,我最近正在休假中,无法亲自回复您的邮件。我将在假期结束后,尽快给您回复。
|
I was able to use the However, I want to also add a new behavior which is fallback to the configured load balancing strategy when the header that includes the partition key is not present. Here is the module import com.twitter.finagle
import com.twitter.finagle.Stack.{Module0, Params}
import com.twitter.finagle.http.{Request, Response}
import com.twitter.finagle.loadbalancer.LoadBalancerFactory
import com.twitter.finagle.partitioning.{ConsistentHashPartitioningService, PartitioningService}
import com.twitter.finagle.partitioning.param.{KeyHasher, NumReps}
import com.twitter.util.Future
import sun.nio.cs.UTF_8
object AffinityHttpClientModule {
val role = Stack.Role("HttpPartitioning")
val module = new Stack.Module[ServiceFactory[Request,Response]] {
val parameters = Seq(
implicitly[Stack.Param[LoadBalancerFactory.Dest]],
implicitly[Stack.Param[finagle.param.Stats]]
)
def newConsistentHashPartitioningService(underlying: Stack[ServiceFactory[Request, Response]],
params: Params
): ConsistentHashPartitioningService[Request, Response, String] = {
val KeyHasher(hasher) = params[KeyHasher]
val NumReps(numReps) = params[NumReps]
new ConsistentHashPartitioningService[Request,Response,String](underlying,params,hasher,numReps) {
override protected def getKeyBytes(key: String): Array[Byte] = key.getBytes(UTF_8.INSTANCE)
override protected def getPartitionKeys(request: Request): Iterable[String] = {
request.headerMap.get("request-key").toList
}
override protected def createPartitionRequestForKeys(original: Request, keys: Seq[String]): Request = {
// TODO We are not creating multiple requests from one so...
original
}
override protected def mergeResponses(originalReq: Request, results: PartitioningService.PartitionedResults[Request, Response]): Response = {
// TODO I'm not doing N requests so this should be enough
results.successes.toList match {
case (_,res)::_ => res
case _ => throw results.failures.toList.head._2
}
}
override protected def noPartitionInformationHandler(req: Request): Future[Nothing] = {
Future.exception(new Exception())
}
}
}
final override def make(
params: Params,
next: Stack[ServiceFactory[Request, Response]]
): Stack[ServiceFactory[Request, Response]] = {
val partitioningService: Service[Request, Response] = newConsistentHashPartitioningService(next, params)
// TODO validate if it is ok to materialize the stack here
val serviceFactory = next.make(params)
Stack.leaf(role, ServiceFactory.const(new Service[Request,Response]{
override def apply(request: Request): Future[Response] = {
partitioningService(request)
// if the request key is set choose the host based on the consistent hash of the key
if (request.headerMap.contains("request-key"))
partitioningService(request)
else {
// otherwise, load balance using the configured load balancer
serviceFactory.apply().flatMap(_.apply(request))
}
}
}))
}
override def role: Stack.Role = AffinityHttpClientModule.role
override def description: String = ""
}
} And then I do a insertAfter in the http client stack def client: Http.Client = Client().withStack(_.insertAfter(
BindingFactory.role,
AffinityHttpClientModule.module
)) |
Is your feature request related to a problem? Please describe.
I want to perform load balancing based on the request to g et affinity with servers that have a hot cache for the data each request will access.
Currently this can't be done in the Load Balancer layer because the pick method doesn't have access to the request.
Describe the solution you'd like
I think having a partitioning aware client could be the solution, but I wonder why the service
ConsistentHashPartitioningService
is private, which makes it impossible to reuse with an Http Client.Even if that class wasn't private, I'm not sure how the wiring should be done, it seems I need to use the Stack API.
Describe alternatives you've considered
Using a ServiceFactory to instantiate a service per partition (since it performs LB only once), but this makes using the client much harder since now I have to maintaining this mapping of key to service.
Besides that, I don't have a way to achieve the affinity, aka telling that service instance which is the subset of nodes that should use to load balance.
The text was updated successfully, but these errors were encountered: