Skip to content

Commit

Permalink
Package
Browse files Browse the repository at this point in the history
  • Loading branch information
sam-lippert committed Aug 24, 2023
1 parent a221a1b commit 57af0e0
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 11 deletions.
27 changes: 17 additions & 10 deletions UpstashKafka.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,48 +5,55 @@ export class UpstashKafka {
}

async listQueues() {
let data = await fetch(`https://${this.baseUrl}/topics`, {
const data = await fetch(`https://${this.baseUrl}/topics`, {
headers: { Authorization: 'Basic ' + this.auth },
}).then((response) => response.json())

data = await fetch(`https://${this.baseUrl}/offsets/latest`, {
return await fetch(`https://${this.baseUrl}/offsets/latest`, {
headers: { Authorization: 'Basic ' + this.auth },
method: 'POST',
body: JSON.stringify(Object.entries(data).flatMap(([topic, partitions]) => Array.from(Array(partitions).keys()).map((partition) => ({ topic, partition })))),
}).then((response) => response.json())
return data
})
.then((response) => response.json())
.then((t) => t.map(({ topic, partition, offset }) => ({ queue: topic, partition, offset })))
}

async send(queue, message) {
return await fetch(`https://${this.baseUrl}/produce/${queue}/${message}`, {
headers: { Authorization: 'Basic ' + this.auth },
}).then((response) => response.json())
}).then((response) => this.formatResponse(response))
}

async sendBatch(queue, messages) {
return await fetch(`https://${this.baseUrl}/produce/${queue}`, {
headers: { Authorization: 'Basic ' + this.auth },
method: 'POST',
body: JSON.stringify(messages.map((value) => ({ value }))),
}).then((response) => response.json())
}).then((response) => this.formatResponse(response))
}

async queue(queue, group = 'GROUP_1', partition = '0') {
return await fetch(`https://${this.baseUrl}/consume/${group}/${partition}/${queue}`, {
headers: { Authorization: 'Basic ' + this.auth },
}).then((response) => response.json())
}).then((response) => this.formatResponse(response))
}

async fetch(queue, partition, offset) {
const response = await fetch(`https://${this.baseUrl}/fetch`, {
return await fetch(`https://${this.baseUrl}/fetch`, {
headers: { Authorization: `Basic ${auth}` },
method: 'POST',
body: JSON.stringify({
topic: queue,
partition,
offset,
}),
})
return await response.json()
}).then((response) => this.formatResponse(response))
}

async formatResponse(response) {
let value = await response.json()
value = { queue: value.topic, ...value }
delete value.topic
return value
}
}
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
"name": "@drivly/kafka.do",
"version": "0.0.1",
"description": "Cloudflare Worker API for Kafka with webhooks",
"private": true,
"scripts": {
"deploy": "wrangler deploy",
"dev": "wrangler dev",
Expand All @@ -21,5 +20,8 @@
"printWidth": 200,
"semi": false,
"tabWidth": 2
},
"publishConfig": {
"access": "restricted"
}
}

0 comments on commit 57af0e0

Please sign in to comment.