Skip to content

Commit

Permalink
Add concurrency groups
Browse files Browse the repository at this point in the history
  • Loading branch information
sam-lippert committed Aug 22, 2023
1 parent a0834f3 commit c1b03ca
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 7 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
node_modules
node_modules
.dev.vars
35 changes: 29 additions & 6 deletions worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,13 @@ router.get('/', async (request) => {
Authorization: 'Basic ' + request.auth,
},
}).then((response) => response.json())
data = Object.fromEntries(Object.entries(data).map(([key, value]) => [key, { partitions: value }]))
return json({ api: request.api, data, user: request.ctx.user })
})

router.get('/:queue', async (request) => {
const { queue } = request.params
let data = await fetch(`https://${request.env.UPSTASH_KAFKA_SERVER}/consume/GROUP_1/INSTANCE_1/${queue}`, {
data = await fetch(`https://${request.env.UPSTASH_KAFKA_SERVER}/offsets/latest`, {
headers: {
Authorization: 'Basic ' + request.auth,
},
method: 'POST',
body: JSON.stringify(Object.entries(data).flatMap(([topic, partitions]) => Array.from(Array(partitions).keys()).map((partition) => ({ topic, partition })))),
}).then((response) => response.json())
return json({ api: request.api, data, user: request.ctx.user })
})
Expand Down Expand Up @@ -77,6 +74,32 @@ router.post('/:queue/sendBatch', async (request) => {
return json({ api: request.api, data, user: request.ctx.user })
})

async function queue(baseUrl, auth, queue, partition, instance) {
return await fetch(`https://${baseUrl}/consume/${partition}/${instance}/${queue}`, {
headers: {
Authorization: 'Basic ' + auth,
},
}).then((response) => response.json())
}

router.get('/:queue', async (request) => {
const { queue: topic } = request.params
let data = await queue(request.env.UPSTASH_KAFKA_SERVER, request.auth, topic, 'GROUP_1', 'INSTANCE_1')
return json({ api: request.api, data, user: request.ctx.user })
})

router.get('/:queue/:partition', async (request) => {
const { queue: topic, partition } = request.params
let data = await queue(request.env.UPSTASH_KAFKA_SERVER, request.auth, topic, partition, 'INSTANCE_1')
return json({ api: request.api, data, user: request.ctx.user })
})

router.get('/:queue/:partition/:instance', async (request) => {
const { queue: topic, partition, instance } = request.params
let data = await queue(request.env.UPSTASH_KAFKA_SERVER, request.auth, topic, partition, instance)
return json({ api: request.api, data, user: request.ctx.user })
})

router.get('*', (request) => error(404, { api: request.api, error: 'Not Found', user: request.ctx.user }))

export default {
Expand Down

0 comments on commit c1b03ca

Please sign in to comment.