Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

feat: First consumer implementation #3

Merged
merged 12 commits into from
Dec 8, 2023
Merged

feat: First consumer implementation #3

merged 12 commits into from
Dec 8, 2023

Conversation

tomasfarias
Copy link
Contributor

@tomasfarias tomasfarias commented Dec 6, 2023

This PR includes an implementation of a hook-consumer to process webhook jobs. A webhook job is defined by its job parameters:

pub struct WebhookJobParameters {
    body: String,
    headers: collections::HashMap<String, String>,
    method: HttpMethod,
    url: String,
}

These should match the plugin-scaffold interface. Personally, I would prefer to use bytes for the body instead of String, to relax the utf-8 constraint and avoid all encoding issues. But I don't think that's possible with JSON serialization.

The consumer runs in a loop:

  1. Waits forever (or until cancelled) until a job is available in the queue.
    • This wait is done in polling fashion. Polling interval can be configured.
  2. Once a job is dequeued, a task is spawned to process it:
    1. Attempt to send a request according to the parameters.
    2. Update the job state according to the request result:
      • On error:
        • If the job has retries pending and the status code is retryable (429 or 5XX): retry the job.
        • If the job has no retries pending or the status code is not retryable: fail the job.
      • On OK: Complete the job.

Consumer takes a reference to a PgQueue, in case we decide that multiple consumers should be spawned. Currently, this is not the case, and we instead follow the tokio echoserver pattern of having a single main task spawn multiple concurrent tasks to process the job.

Other changes:

  • Update development docker-compose stack to include an echo server and a migration service.
    • This reduces testing steps to just two, and removes the requirement to install sqlx-cli.
    • The echo server is implemented by a simple Caddyfile.
  • dequeue and dequeue_tx now take a generic J that implements Deserialize<'de> instead of DeserializeOwned. This should be overall less restrictive, although currently not critical.
  • PgQueue no longer takes a worker name, this is instead passed as a parameter when calling dequeue or dequeue_tx.
  • Reduced usage of chrono::Duration in favor of std library std::time::Duration. The later is more widely supported, and I appreciated working with unsigned types (as we generally do not accept negative durations).
    • chrono is still used when DateTime is required, like in parsing Retry-After.
  • Added EnvConfig to configure the consumer.

@tomasfarias tomasfarias marked this pull request as ready for review December 6, 2023 13:46
Copy link
Contributor

@bretthoerner bretthoerner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great. Some of my feedback could become future issues or whatever you feel like.

reqwest = { version = "0.11" }
serde = { version = "1.0" }
serde_derive = { version = "1.0" }
sqlx = { version = "0.7", features = [ "runtime-tokio", "tls-native-tls", "postgres", "uuid", "json", "chrono" ] }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor, and can be done later, but I like this workspace dep setup so we can have 1 place for versions: https://github.com/PostHog/rusty-hook/pull/2/files#diff-2e9d962a08321605940b5a657135052fbcef87b5e360662bb527c96d9a615542R15

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I was focusing on the consumer and forgot we were in a workspace. Should be quick to add, hold on!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, since this is being worked on #2, I would leave it for later to avoid conflicts. We can merge that PR and then I'll either rebase this one if it's not merged yet, or open a new one to use common dependencies.

hook-consumer/src/consumer.rs Outdated Show resolved Hide resolved
retry_policy: RetryPolicy,
) -> PgQueueResult<Self> {
let name = queue_name.to_owned();
let table = table_name.to_owned();
let worker = worker_name.to_owned();
let pool = PgPoolOptions::new()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll definitely want to tune max_connections (via the EnvConfig)

Also important to keep in mind that the number of connections is the limit on how many ongoing dequeue_tx-based jobs can be processing at once, since each will need to hold a connection all the way to PG for the entire time (right?).

Copy link
Contributor Author

@tomasfarias tomasfarias Dec 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, pretty much. We are currently not configuring the connection pool at all, as this will have to be synced with our Postgres instance. I can add the configuration keys (max_connections, acquire_timeout, idle_timeout, max_lifetime), and we can turn those knobs over time.

let webhook_job = self.wait_for_job().await?;

let request_timeout = self.request_timeout; // Required to avoid capturing self in closure.
tokio::spawn(async move { process_webhook_job(webhook_job, request_timeout).await });
Copy link
Contributor

@bretthoerner bretthoerner Dec 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a count of outstanding requests to keep a cap on this loop. If I'm reading correctly and we start this process/loop up with, say, 10,000 items sitting the table it will try to run all 10,000 tasks in parallel. Well, that's if it's dequeue (non-txn) based.

If it's dequeue_tx-based it will be capped on PG waiting for connections. If we're deliberately going to lean on that then I think we should document it here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, since we spawn, what happens to Errors returned by process_webhook_job? I think we need some wrapper to do logging/metrics? I guess we probably want metrics for Ok() results too.

Copy link
Contributor

@bretthoerner bretthoerner Dec 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more, I wonder how much we'll care about graceful shutdown? (i.e. it would be nice to not send duplicates every time we do an upgrade) By firing off spawns I'm not sure how we track that there is ongoing work we'd like to give a few seconds to complete?

I guess if we had the counter/semaphore thing in my first comment above, we could start a shutdown (not dequeue new tasks) and watch for the counter/semaphore to reach 0?

Or we could just stop dequeue-ing and sleep for a little more than 5 seconds (or whatever the max timeout is). That feels a little ugly, but it would work OK for now, but may break as requirements/assumptions change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a count of outstanding requests to keep a cap on this loop. If I'm reading correctly and we start this process/loop up with, say, 10,000 items sitting the table it will try to run all 10,000 tasks in parallel. Well, that's if it's dequeue (non-txn) based.

Not right now, dequeue also acquires a connection, if there are no connections on the pool, then we are blocked on acquire(), similar to dequeue_tx. So, for now, this is not a problem.

However, if we pass a pool instead of a connection, like you suggested in the comment below, we would block only on the db calls. E.g. if our pg connection limit is 10, and all of our 10 worker threads reach a db call, the main task won't be able to acquire a connection to query for new jobs.

Ignoring that for a moment, what's the problem of high concurrency? We are still going to wait for IO, might as well keep firing requests, why do you think we need a cap (count is okay, always happy to track)? Tokio will evenly distribute work across cpu cores, so we should get even usage across the board, and can bump/autoscale the cpu if that's struggling.

The only problem I can imagine is lack of host-awareness causing us to involuntarily ddos common hosts. I would prefer exploring a more host-aware cap than a cap on the entire loop, as a general cap would be unfair to less common hosts. But happy to hear other issues, I'm may be missing something.

If it's dequeue_tx-based it will be capped on PG waiting for connections. If we're deliberately going to lean on that then I think we should document it here.

Yes. dequeue_tx has to hold a transaction, which means we have to hold a connection. I can't think there is a way around this. I guess this is the price we pay and it all comes down to PG performance. Happy to document 👍

Also, since we spawn, what happens to Errors returned by process_webhook_job? I think we need some wrapper to do logging/metrics? I guess we probably want metrics for Ok() results too.

Definitely need metrics here. Probably going to copy the setup at #2 (but without a web middleware).

One more, I wonder how much we'll care about graceful shutdown?
I guess if we had the counter/semaphore thing in my first comment above, we could start a shutdown (not dequeue new tasks) and watch for the counter/semaphore to reach 0?

I like this idea, although I would put a time limit on the counter to actually shutdown even if we take too long; I think we can tolerate some duplicates as the expectation with webhooks should be of at least once delivery.

Tokio has great support for graceful shutdown handling: https://tokio.rs/tokio/topics/shutdown. But if you agree I would add this in a follow-up PR for size considerations. Happy to make this a blocking requirement of the overall application, as I think it should be present in v1 (just because it's on a separate PR, doesn't mean I'm pushing this to v2).

Copy link
Contributor

@bretthoerner bretthoerner Dec 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the problem of high concurrency?

I just have a sense of unease around things without limits. async tasks may be cheaper than threads but they aren't free, and there is an eventual limit on memory, file descriptors, network traffic, etc.

(If dequeue_tx actually works in practice then there is basically no world where the number of async tasks we can run (i.e. only up to the PG connection limit) is going to hurt us. I'm still a little suspicious that dequeue_tx will work in practice, though.)

If you'd prefer to just start by tracking it with a Prometheus Gauge, then that sounds good to me. If nothing else, knowing that count = pg connection limit would tell us we are limited by PG connections, which seems like useful observability for dequeue_tx.

I would ask that you go ahead and document that it's only currently being limited by the PG connection pool resource, because if I read this code in the future without that comment I would raise my eyebrows. It's just scar tissue. :)

And I agree we'll also want some kind of per-host limits (or at least metrics) eventually. Hopefully the HTTP connection pool inside reqwest can help there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a comment to reference the current connection limit 👍

I agree that dequeue_tx still has to be fixed, but the way I think it's broken is that it should be allowed to blast through the queue, basically without limits. I think that there are some fixes to do to address our concerns:

  • Memory limits: Enforced limits on payload size (which we can even encode in the type system).
  • File descriptors/network (indirectly): Limiting the number of connections reqwest::Client hands out and making new requests wait for it (unfortunately, I can't find anything baked in with Client, but we can wrap this in a tokio Semaphore or something).
  • CPU: Tokio should already be dealing with this by setting a default worker limit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyways, my thoughts are to merge this and work on this next (also addressing the other comments).

body: String,
timeout: std::time::Duration,
) -> Result<reqwest::Response, WebhookConsumerError> {
let client = reqwest::Client::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm reading the internals right this is truly constructing a fresh client from nothing every time. It doesn't have to be now, but I think we'll really want to re-use a pool of some kind since we'll be hammering the same servers and could reuse connections.

It looks like reqwest is building a ClientBuilder (which has pool support) under the hood, so maybe we can just make one of those toplevel and reuse it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, ClientBuilder even takes a timeout that'll apply to all of them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely could re-use the client, and I like the idea of configuring it once for all. Not sure about how effective re-using connections would be, as it would depend on our workload and pool size, but happy to try it out.

})
}

/// Dequeue a Job from this PgQueue to work on it.
pub async fn dequeue<J: DeserializeOwned + std::marker::Send + std::marker::Unpin + 'static>(
pub async fn dequeue<
Copy link
Contributor

@bretthoerner bretthoerner Dec 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this one isn't used by the consumer, I take it the idea is to try the txn-based one first and swap this by code changes in if we have issues?

Before I forget, I had a couple of related observations that I missed in the previous review:

  1. It's odd to me that we close the connection if no rows are returned, that doesn't seem like an error? I think we could drop it so it (I assume) returns to the connection pool for reuse?
  2. I don't think we want to return the Connection in the PgJob for non-txn dequeue, there's no reason to tie up the PG connection for the lifetime of the webhook send, right?

But both of these are unimportant if we end up sticking with dequeue_tx. 🤷‍♂️

Copy link
Contributor Author

@tomasfarias tomasfarias Dec 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I take it the idea is to try the txn-based one first and swap this by code changes in if we have issues?

Yes, although we could also make this configurable with, e.g., a enable_transactional key.

EDIT: Currently, I would only run transactional mode. Non-txn mode requires clean-up for orphan tasks. I haven't thought about who would execute such clean-up tasks, and how we would avoid contention under multiple pods. To give you an idea, river queue has a whole leader election scheme built on PG (with a pg table, and pg channels) to ensure only one leader executes clean-up tasks. We could implement a similar scheme: Have each consumer spawn the maintenance services which would continuously poll to see if they are the leader and can actually run the services. However, I would leave this up for a separate PR, or for after we see how far we can get with tx-mode.

It's odd to me that we close the connection if no rows are returned, that doesn't seem like an error? I think we could drop it so it (I assume) returns to the connection pool for reuse?

We can let it drop, I am following the docs and being explicit with close:

This method is not required for safe and consistent operation. However, it is recommended to call it instead of letting a connection drop as the database backend will be faster at cleaning up resources.

I now think this was meant to describe cleaning-up a non-pooled connection, which is not our case.

I don't think we want to return the Connection in the PgJob for non-txn dequeue, there's no reason to tie up the PG connection for the lifetime of the webhook send, right?

True, we can pass the pool instead. Some lifetime wrangling may be required, but should be doable.

Copy link
Contributor

@bretthoerner bretthoerner Dec 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't there cleanup required in the dequeue_tx code paths also? At minimum we're currently leaving behind completed and failed job rows. I haven't checked out what else River cleanups do.

Copy link
Contributor

@bretthoerner bretthoerner Dec 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dequeue_tx could just DELETE rows when it's done -- but I don't know what we have in mind for customer facing metrics. If we're going to bill for this, I assume we need to show them success/fail as graphs or something? It seems convenient for a scheduled job to scrape the jobs table and push those into CH as it does deletes, or something. I'm just riffing, maybe we had other plans? (I was out when this project was discussed. Ah, there is a "global management service" mentioned that needs to do other things, I feel like it could do cleanup too? Although that doc says the consumer "writes metrics to CH app_metrics", I guess either one could do it. It seems convenient to do it as part of cleaning up dead rows, though?)

btw, I imagine River is just trying to keep its dependencies minimal (i.e. make no expectations about k8s or anything). For our needs we could run a singleton k8s pod that has a separate hook-cron process that runs every minute or something, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, we can pass the pool instead. Some lifetime wrangling may be required, but should be doable.

I'm totally fine if you just want to focus on dequeue_tx for now, it seems like we'll only want one implementation in the end. But if we fall back to work on dequeue then I definitely don't think we should hold a Connection object for no good reason. That to me seems like a major benefit of the dequeue-style.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't there cleanup required in the dequeue_tx code paths also? At minimum we're currently leaving behind completed and failed job rows.

Yeah, but cleanup in dequeue_tx is not that critical: The failed and completed jobs will be skipped by consumers and will remain there wasting space, but not much else assuming we have an index on status. The table is pretty light, so if we provision a moderately sized PG, this space waste would not be much, even if we don't clean it up frequently.

With regular dequeue the cleanup is critical to actually deliver the webhooks: Orphan jobs need to be reset from running status so that consumers know they can retry them. Alternatively, we could look into a work stealing approach where consumers proactively pick jobs that have been running for a long time, obviously at the risk of duplicates. All in all, things get more tricky without transactions.

Both modes would require a solution for tracking (as I discuss below)...

but I don't know what we have in mind for customer facing metrics. If we're going to bill for this, I assume we need to show them success/fail as graphs or something?

Yeah, as a starting point, that's what is in the spec: "success / failure (into app_metrics table in CH)".

dequeue_tx could just DELETE rows when it's done.

Yeah, I like this idea, I originally used a DELETE FROM query instead of UPDATE. This cuts down the number of statuses required to 1, in other words we can drop the status field: a job is either in the queue or it isn't, that's the status. However, this means we can't simply query the queue table to get metrics for reporting/debugging.

@tomasfarias tomasfarias merged commit e461d06 into main Dec 8, 2023
4 checks passed
@tomasfarias tomasfarias deleted the feat/worker branch December 8, 2023 10:57
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants