Skip to content

Commit

Permalink
Axe svix
Browse files Browse the repository at this point in the history
  • Loading branch information
sam-lippert committed Aug 21, 2023
1 parent 127c7a9 commit 24b9f13
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 152 deletions.
4 changes: 1 addition & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@
},
"dependencies": {
"@upstash/kafka": "^1.3.3",
"itty-durable": "^2.1.0",
"itty-router": "^4.0.15",
"svix": "^1.8.1"
"itty-router": "^4.0.22"
},
"devDependencies": {
"prettier": "^3.0.2",
Expand Down
101 changes: 25 additions & 76 deletions worker.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
import { Kafka } from '@upstash/kafka'
import { createDurable, withDurables } from 'itty-durable'
import { Router, error, json, withParams } from 'itty-router'
import { Svix } from 'svix'

let kafkaConfig
let svixSecret
let kafka

const withCtx = async (request, env) => {
request.ctx = await env.CTX.fetch(request).then((res) => res.json())
if (!kafkaConfig)
kafkaConfig = {
if (!kafka)
kafka = new Kafka({
url: env.KAFKA_URL,
username: env.KAFKA_USERNAME,
password: env.KAFKA_PASSWORD,
}
if (!svixSecret) svixSecret = env.SVIX_SECRET
})
if (!request.ctx.user) {
return Response.redirect('/login')
}
Expand All @@ -24,16 +20,10 @@ const withCtx = async (request, env) => {
description: 'Cloudflare Worker API for Kafka with webhooks',
url: 'https://kafka.do',
endpoints: {
listAll: request.ctx.origin + '/',
listQueues: request.ctx.origin + '/',
consume: request.ctx.origin + '/:queue',
produce: request.ctx.origin + '/:queue/send/:message',
sendBatch: request.ctx.origin + '/:queue/sendBatch',
acknowledgeAll: request.ctx.origin + '/:queue/ackAll',
retryAll: request.ctx.origin + '/:queue/retryAll',
acknowledge: request.ctx.origin + '/:queue/ack/:messageId',
retry: request.ctx.origin + '/:queue/retry/:messageId',
listWebhooks: request.ctx.origin + '/:queue/webhook',
createWebhook: request.ctx.origin + '/:queue/webhook/:url',
},
memberOf: 'https://apis.do/pubsub',
login: request.ctx.origin + '/login',
Expand All @@ -45,89 +35,48 @@ const withCtx = async (request, env) => {
const router = Router()
router.all('*', withCtx)
router.all('*', withParams)
router.all('*', withDurables())

router.get('/', async (request) => {
return json({ api: request.api, user: request.ctx.user })
})

router.get('/:queue', async (request) => {
return json({ api: request.api, user: request.ctx.user })
const { queue } = request.params
const c = kafka.consumer()
const data = await c.consume({
consumerGroupId: 'group_1',
instanceId: 'instance_1',
topics: [queue],
autoOffsetReset: 'earliest',
})

return json({ api: request.api, data, user: request.ctx.user })
})

router.get('/:queue/send/:message', async (request) => {
return json({ api: request.api, user: request.ctx.user })
const p = kafka.producer()
const { queue, message } = request.params
const data = await p.produce(queue, message)
return json({ api: request.api, data, user: request.ctx.user })
})

router.post('/:queue/sendBatch', async (request) => {
return json({ api: request.api, user: request.ctx.user })
})

router.get('/:queue/ackAll', async (request) => {
return json({ api: request.api, user: request.ctx.user })
})

router.get('/:queue/retryAll', async (request) => {
return json({ api: request.api, user: request.ctx.user })
})

router.get('/:queue/ack/:messageId', async (request) => {
return json({ api: request.api, user: request.ctx.user })
const { queue } = request.params
const messages = await request.json()
const p = kafka.producer()
const options = undefined
const data = await p.produceMany(messages.map((value) => ({ topic: queue, value, options })))
return json({ api: request.api, data, user: request.ctx.user })
})

router.get('/:queue/retry/:messageId', async (request) => {
return json({ api: request.api, user: request.ctx.user })
})

router.get('/:queue/webhook/:url', async (request) => {
return json({ api: request.api, user: request.ctx.user })
})

router.get('/:queue/webhook', async (request) => {
return json({ api: request.api, user: request.ctx.user })
})

router.get('*', (request) => error(404, { api: request.api, error: 'Not Found', user: request.ctx.user }))

export default {
fetch(request, env) {
return router.handle(request, env)
},
}

export class TopicManager extends createDurable({ autoReturn: true, autoPersist: true }) {
constructor(state, env) {
super(state, env)
this.topics = []
}

listTopics() {
return this.topics.map(({ name }) => name)
}

addTopic(name) {
this.topics.push({ name })
}

addWebhook(topic, url) {
const topicIndex = this.topics.findIndex(({ name }) => name === topic)
if (topicIndex === -1) {
this.topics.push({ name: topic })
}
const topicObj = this.topics[topicIndex]
if (!topicObj.webhooks) {
topicObj.webhooks = []
}
topicObj.webhooks.push(url)
}

removeWebhook(topic, url) {
const topicIndex = this.topics.findIndex(({ name }) => name === topic)
if (topicIndex === -1) return
const topicObj = this.topics[topicIndex]
if (!topicObj.webhooks) return
const urlIndex = topicObj.webhooks.findIndex((u) => u === url)
if (urlIndex === -1) return
topicObj.webhooks.splice(urlIndex, 1)
}
}
9 changes: 0 additions & 9 deletions wrangler.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,3 @@ compatibility_date = "2023-07-24"
services = [
{ binding = "CTX", service = "ctx-do", environment = "production" }
]

[durable_objects]
bindings = [
{ name = "TopicManager", class_name = "TopicManager" }
]

[[migrations]]
tag = "v1"
new_classes = ["TopicManager"]
71 changes: 7 additions & 64 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,6 @@
resolved "https://registry.yarnpkg.com/@esbuild/win32-x64/-/win32-x64-0.16.3.tgz#94047dae921949cfb308117d993c4b941291ae10"
integrity sha512-5/JuTd8OWW8UzEtyf19fbrtMJENza+C9JoPIkvItgTBQ1FO2ZLvjbPO6Xs54vk0s5JB5QsfieUEshRQfu7ZHow==

"@stablelib/base64@^1.0.0":
version "1.0.1"
resolved "https://registry.yarnpkg.com/@stablelib/base64/-/base64-1.0.1.tgz#bdfc1c6d3a62d7a3b7bbc65b6cce1bb4561641be"
integrity sha512-1bnPQqSxSuc3Ii6MhBysoWCg58j97aUjuCSZrGSmDxNqtytIi0k8utUenAwTZN4V5mXXYGsVUI9zeBqy+jBOSQ==

"@upstash/kafka@^1.3.3":
version "1.3.3"
resolved "https://registry.yarnpkg.com/@upstash/kafka/-/kafka-1.3.3.tgz#7db8dbc118ce1991aaa287474cc6a92e05438736"
Expand Down Expand Up @@ -329,11 +324,6 @@ end-of-stream@^1.1.0, end-of-stream@^1.4.1:
dependencies:
once "^1.4.0"

es6-promise@^4.2.4:
version "4.2.8"
resolved "https://registry.yarnpkg.com/es6-promise/-/es6-promise-4.2.8.tgz#4eb21594c972bc40553d276e510539143db53e0a"
integrity sha512-HJDGx5daxeIvxdBxvG2cb9g4tEvwIk3i8+nhX0yGrYmZUzbkdg8QbDevheDB8gd0//uPj4c1EQua8Q+MViT0/w==

[email protected]:
version "0.16.3"
resolved "https://registry.yarnpkg.com/esbuild/-/esbuild-0.16.3.tgz#5868632fa23f7a8547f2a4ea359c44e946515c94"
Expand Down Expand Up @@ -382,11 +372,6 @@ expand-template@^2.0.3:
resolved "https://registry.yarnpkg.com/expand-template/-/expand-template-2.0.3.tgz#6e14b3fcee0f3a6340ecb57d2e8918692052a47c"
integrity sha512-XYfuKMvj4O35f/pOXLObndIRvyQ+/+6AhODh+OKWj9S9498pHHn/IMszH+gt0fBCRWMNfk1ZSp5x3AifmnI2vg==

fast-sha256@^1.3.0:
version "1.3.0"
resolved "https://registry.yarnpkg.com/fast-sha256/-/fast-sha256-1.3.0.tgz#7916ba2054eeb255982608cccd0f6660c79b7ae6"
integrity sha512-n11RGP/lrWEFI/bWdygLxhI+pVeo1ZYIVwvvPkW7azl/rOy+F3HYRZ2K5zeE9mmkhQppyv9sQFx0JM9UabnpPQ==

[email protected]:
version "1.0.0"
resolved "https://registry.yarnpkg.com/file-uri-to-path/-/file-uri-to-path-1.0.0.tgz#553a7b8446ff6f684359c445f1e37a05dacc33dd"
Expand Down Expand Up @@ -486,15 +471,10 @@ isomorphic-fetch@^3.0.0:
node-fetch "^2.6.1"
whatwg-fetch "^3.4.1"

itty-durable@^2.1.0:
version "2.1.0"
resolved "https://registry.yarnpkg.com/itty-durable/-/itty-durable-2.1.0.tgz#a98aa85e09eaa1fa3c38fc6c71bbfd48edeecee9"
integrity sha512-iOfuUcMCMVfTS+qqd0miRw9caH+DuonwALoBGlfjqT6ed6yyskI2sWX6gAtDQsaueCRl3BdYRVpvlVNsuNsiNw==

itty-router@^4.0.15:
version "4.0.15"
resolved "https://registry.yarnpkg.com/itty-router/-/itty-router-4.0.15.tgz#e7ddcab5cac90ffff8edc01cecf24a7df851f061"
integrity sha512-hmjl7IPrMBRkOi5jPPfztHlm/XpQ62S37782oz14+LTmeih3fGRyB61aC/nh9ocmb7RExjESgTHE9UZIYT/qxQ==
itty-router@^4.0.22:
version "4.0.22"
resolved "https://registry.yarnpkg.com/itty-router/-/itty-router-4.0.22.tgz#2a5ba61f6b1df261d8321e8d218edcb0b59a0001"
integrity sha512-FWK13vcITqld32NaOh+k2unRrR4qBZdzA9O1ctpqT1Q4lxKrHbtgaX1lLTv2VyXj3/8Waqk/NlDGNfxypJ8GKA==

kleur@^4.1.5:
version "4.1.5"
Expand Down Expand Up @@ -585,9 +565,9 @@ node-abi@^3.3.0:
semver "^7.3.5"

node-fetch@^2.6.1:
version "2.6.12"
resolved "https://registry.yarnpkg.com/node-fetch/-/node-fetch-2.6.12.tgz#02eb8e22074018e3d5a83016649d04df0e348fba"
integrity sha512-C/fGU2E8ToujUivIO0H+tpQ6HWo4eEmchoPIoXtxCrVghxdKq+QOHqEZW7tuP3KlV3bC8FRMO5nMCC7Zm1VP6g==
version "2.6.13"
resolved "https://registry.yarnpkg.com/node-fetch/-/node-fetch-2.6.13.tgz#a20acbbec73c2e09f9007de5cda17104122e0010"
integrity sha512-StxNAxh15zr77QvvkmveSQ8uCQ4+v5FkvNTj0OESmiHu+VRi/gXArXtkWMElOsOUNLtUEvI4yS+rdtOHZTwlQA==
dependencies:
whatwg-url "^5.0.0"

Expand Down Expand Up @@ -654,11 +634,6 @@ pump@^3.0.0:
end-of-stream "^1.1.0"
once "^1.3.1"

querystringify@^2.1.1:
version "2.2.0"
resolved "https://registry.yarnpkg.com/querystringify/-/querystringify-2.2.0.tgz#3345941b4153cb9d082d8eee4cda2016a9aef7f6"
integrity sha512-FIqgj2EUvTa7R50u0rGsyTftzjYmv/a3hO345bZNrqabNqjtgiDMgmo4mkUjd+nzU5oF3dClKqFIPUKybUyqoQ==

rc@^1.2.7:
version "1.2.8"
resolved "https://registry.yarnpkg.com/rc/-/rc-1.2.8.tgz#cd924bf5200a075b83c188cd6b9e211b7fc0d3ed"
Expand All @@ -685,11 +660,6 @@ readdirp@~3.6.0:
dependencies:
picomatch "^2.2.1"

requires-port@^1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/requires-port/-/requires-port-1.0.0.tgz#925d2601d39ac485e091cf0da5c6e694dc3dcaff"
integrity sha512-KigOCHcocU3XODJxsu8i/j8T9tzT4adHiecwORRQ0ZZFcp7ahwXuRU1m+yuO90C5ZUyGeGfocHDI14M3L3yDAQ==

rollup-plugin-inject@^3.0.0:
version "3.0.2"
resolved "https://registry.yarnpkg.com/rollup-plugin-inject/-/rollup-plugin-inject-3.0.2.tgz#e4233855bfba6c0c12a312fd6649dff9a13ee9f4"
Expand Down Expand Up @@ -804,25 +774,6 @@ strip-json-comments@~2.0.1:
resolved "https://registry.yarnpkg.com/strip-json-comments/-/strip-json-comments-2.0.1.tgz#3c531942e908c2697c0ec344858c286c7ca0a60a"
integrity sha512-4gB8na07fecVVkOI6Rs4e7T6NOTki5EmL7TUduTs6bu3EdnSycntVJ4re8kgZA+wx9IueI2Y11bfbgwtzuE0KQ==

svix-fetch@^3.0.0:
version "3.0.0"
resolved "https://registry.yarnpkg.com/svix-fetch/-/svix-fetch-3.0.0.tgz#c13e20b69ceb3ad43b52dd3933ec30bf278c571d"
integrity sha512-rcADxEFhSqHbraZIsjyZNh4TF6V+koloX1OzZ+AQuObX9mZ2LIMhm1buZeuc5BIZPftZpJCMBsSiBaeszo9tRw==
dependencies:
node-fetch "^2.6.1"
whatwg-fetch "^3.4.1"

svix@^1.8.1:
version "1.8.1"
resolved "https://registry.yarnpkg.com/svix/-/svix-1.8.1.tgz#ddc9da3960a523372ab3156fd4bbe7fc304166aa"
integrity sha512-pIuIWPQqZVZKlnfokH0Y1OnWXKZBxE6s5DSVudHPKSU66D0M/OOvCX9dC34xbq1SUVT8uFllit6Ek3PGEJ77xA==
dependencies:
"@stablelib/base64" "^1.0.0"
es6-promise "^4.2.4"
fast-sha256 "^1.3.0"
svix-fetch "^3.0.0"
url-parse "^1.4.3"

tar-fs@^2.0.0:
version "2.1.1"
resolved "https://registry.yarnpkg.com/tar-fs/-/tar-fs-2.1.1.tgz#489a15ab85f1f0befabb370b7de4f9eb5cbe8784"
Expand Down Expand Up @@ -875,14 +826,6 @@ undici@^5.13.0:
dependencies:
busboy "^1.6.0"

url-parse@^1.4.3:
version "1.5.10"
resolved "https://registry.yarnpkg.com/url-parse/-/url-parse-1.5.10.tgz#9d3c2f736c1d75dd3bd2be507dcc111f1e2ea9c1"
integrity sha512-WypcfiRhfeUP9vvF0j6rw0J3hrWrw6iZv3+22h6iRMJ/8z1Tj6XfLP4DsUix5MhMPnXpiHDoKyoZ/bdCkwBCiQ==
dependencies:
querystringify "^2.1.1"
requires-port "^1.0.0"

util-deprecate@^1.0.1:
version "1.0.2"
resolved "https://registry.yarnpkg.com/util-deprecate/-/util-deprecate-1.0.2.tgz#450d4dc9fa70de732762fbd2d4a28981419a0ccf"
Expand Down

0 comments on commit 24b9f13

Please sign in to comment.