Skip to content

Commit

Permalink
Add API & user objects
Browse files Browse the repository at this point in the history
  • Loading branch information
sam-lippert committed Aug 15, 2023
1 parent c30452e commit d5462ff
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 121 deletions.
7 changes: 7 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,12 @@
"devDependencies": {
"prettier": "^3.0.0",
"wrangler": "^3.0.0"
},
"prettier": {
"singleQuote": true,
"trailingComma": "es5",
"printWidth": 200,
"semi": false,
"tabWidth": 2
}
}
231 changes: 110 additions & 121 deletions worker.js
Original file line number Diff line number Diff line change
@@ -1,62 +1,76 @@
import { Router } from "itty-router"
import { Kafka, groupId, setJSON } from "@upstash/kafka";
import { Router, json } from 'itty-router'
import { Kafka } from '@upstash/kafka'

const router = Router()
const kafkaConfig = {
clientId: "kafka-do",
brokers: JSON.parse(KAFKA_BOOTSTRAP_SERVERS),
ssl: true,
sasl: {
mechanism: "scram-sha-256",
username: KAFKA_USERNAME,
password: KAFKA_PASSWORD,
},
const withCtx = async (request, env) => {
request.ctx = await env.CTX.fetch(req).then((res) => res.json())
if (!request.ctx.user) {
return Response.redirect('/login')
}
request.api = {
icon: '🌎',
name: 'kafka.do',
description: 'Cloudflare Worker API for Kafka with webhooks',
url: 'https://kafka.do',
endpoints: {
topics: request.ctx.origin + '/topics',
producer: request.ctx.origin + '/producer/:topic/:message',
consumer: request.ctx.origin + '/consumer/:topic',
consumerBulk: request.ctx.origin + '/consumer/:topic/:count',
fetch: request.ctx.origin + '/fetch/:topic',
fetchBulk: request.ctx.origin + '/fetch/:topic/:count',
},
memberOf: 'https://apis.do/pubsub',
login: request.ctx.origin + '/login',
logout: request.ctx.origin + '/logout',
repo: 'https://github.com/drivly/kafka.do',
}
request.kafkaConfig = {
url: env.KAFKA_URL,
username: env.KAFKA_USERNAME,
password: env.KAFKA_PASSWORD,
}
}

router.get("/topics", async (request) => {
const kafka = new Kafka(kafkaConfig)
const router = Router()

router.get('/topics', async (request) => {
const kafka = new Kafka(request.kafkaConfig)
const admin = kafka.admin()

try {
await admin.connect()
const { topics } = await admin.listTopics()
await admin.disconnect()

return new Response(JSON.stringify({ topics }), { status: 200 })
const topics = await admin.topics()
return json({ api: request.api, topics, user: request.ctx.user }, { status: 200 })
} catch (error) {
return new Response(JSON.stringify({ error: error.message }), { status: 500 })
return json({ api: request.api, error: error.message, user: request.ctx.user }, { status: 500 })
}
})
router.get("/producer/:topic/:message", async (request, { topic, message }) => {
const kafka = new Kafka(kafkaConfig)

router.get('/producer/:topic/:message', async (request) => {
const { topic, message } = request.params
const kafka = new Kafka(request.kafkaConfig)
const producer = kafka.producer()

try {
await producer.connect();
await producer.connect()
const { baseOffset } = await producer.send({
topic,
messages: [{ value: message }],
})
await producer.disconnect();
await producer.disconnect()

return new Response(
JSON.stringify({ topic, message, offset: baseOffset }),
{ status: 200 }
)
return json({ api: request.api, topic, message, offset: baseOffset, user: request.ctx.user }, { status: 200 })
} catch (error) {
return new Response(JSON.stringify({ error: error.message }), { status: 500 })
return json({ api: request.api, error: error.message, user: request.ctx.user }, { status: 500 })
}
})

async function consumeSingleMessage(topic, groupId) {
const kafka = new Kafka(kafkaConfig);
const consumer = kafka.consumer({ groupId });
const kafka = new Kafka(request.kafkaConfig)
const consumer = kafka.consumer({ groupId })

try {
await consumer.connect()
await consumer.subscribe({ topic })
const messages = await consumer.runOnce({ autoCommit: true, eachMessage: true })
await consumer.disconnect()

if (messages && messages.length > 0) {
return messages[0]
Expand All @@ -68,128 +82,103 @@ async function consumeSingleMessage(topic, groupId) {
}
}

router.get("/consumer/:topic", async (request, { topic }) => {
router.get('/consumer/:topic', async (request) => {
try {
const message = await consumeSingleMessage(topic, groupId);
const { topic } = request.params
const message = await consumeSingleMessage(topic, groupId)

if (message) {
return new Response(
JSON.stringify({ topic, message: message.value, offset: message.offset }),
{ status: 200 }
);
return json({ api: request.api, topic, message: message.value, offset: message.offset, user: request.ctx.user }, { status: 200 })
} else {
return new Response(JSON.stringify({ error: "No messages available" }), {
status: 404
})
return json({ api: request.api, error: 'No messages available', user: request.ctx.user }, { status: 404 })
}
} catch (error) {
return new Response(JSON.stringify({ error: error.message }), { status: 500 });
} catch (error) {
return json({ api: request.api, error: error.message, user: request.ctx.user }, { status: 500 })
}
})

async function consumeMultipleMessages(topic, groupId, count) {
const kafka = new Kafka(kafkaConfig);
const consumer = kafka.consumer({ groupId });
try {
await consumer.connect();
await consumer.subscribe({ topic });
const messages = await consumer.runOnce({
autoCommit: true,
maxMessages: count,
eachMessage: true,
});
await consumer.disconnect();
return messages;
} catch (error) {
throw error;
}
}

router.get("/consumer/:topic/:count", async (request, { topic, count }) => {
try {
const messages = await consumeMultipleMessages(topic, groupId, parseInt(count));

if (messages && messages.length > 0) {
return new Response(
JSON.stringify({
topic,
messages: messages.map((message) => ({
message: message.value,
offset: message.offset,
})),
}),
{ status: 200 }
);
} else {
return new Response(JSON.stringify({ error: "No messages available" }), {
status: 404,
});
}
} catch (error) {
return new Response(JSON.stringify({ error: error.message }), { status: 500 });
}
});

router.get("/fetch/:topic", async (request, { topic }) => {
try {
const message = await consumeSingleMessage(topic, groupId + "-fetch");
})

if (message) {
return new Response(
JSON.stringify({ topic, message: message.value, offset: message.offset }),
{ status: 200 }
);
} else {
return new Response(JSON.stringify({ error: "No messages available" }), {
status: 404,
});
}
async function consumeMultipleMessages(topic, groupId, count) {
const kafka = new Kafka(request.kafkaConfig)
const consumer = kafka.consumer({ groupId })
try {
await consumer.subscribe({ topic })
const messages = await consumer.runOnce({
autoCommit: true,
maxMessages: count,
eachMessage: true,
})
return messages
} catch (error) {
return new Response(JSON.stringify({ error: error.message }), { status: 500 });
throw error
}
});

}

router.get("/fetch/:topic/:count", async (request, { topic, count }) => {
router.get('/consumer/:topic/:count', async (request) => {
try {
const messages = await consumeMultipleMessages(topic, groupId + "-fetch", parseInt(count));
const { topic, count } = request.params
const messages = await consumeMultipleMessages(topic, groupId, parseInt(count))

if (messages && messages.length > 0) {
return new Response(
JSON.stringify({
api,
topic,
messages: messages.map((message) => ({
message: message.value,
offset: message.offset,
})),
user: request.ctx.user,
}),
{ status: 200 }
)
} else {
return new Response(JSON.stringify({ error: "No messages available" }), {
status: 404,
});
return json({ api: request.api, error: 'No messages available', user: request.ctx.user }, { status: 404 })
}
} catch (error) {
return new Response(JSON.stringify({ error: error.message }), { status: 500 })
return json({ api: request.api, error: error.message, user: request.ctx.user }, { status: 500 })
}
})

router.get('/fetch/:topic', async (request, { topic }) => {
try {
const message = await consumeSingleMessage(topic, groupId + '-fetch')

router.get("/webhook/:topic/:callback", async (request, { topic, callback }) => {
if (message) {
return json({ api: request.api, topic, message: message.value, offset: message.offset, user: request.ctx.user }, { status: 200 })
} else {
return json({ api: request.api, error: 'No messages available', user: request.ctx.user }, { status: 404 })
}
} catch (error) {
return json({ api: request.api, error: error.message, user: request.ctx.user }, { status: 500 })
}
})

router.get('/fetch/:topic/:count', async (request) => {
try {
const webhookId = "webhook_" + Math.random().toString(36).substr(2, 9)
await setJSON(UPSTASH_REDIS_URL, webhookId, { topic, callback })
const { topic, count } = request.params
const messages = await consumeMultipleMessages(topic, groupId + '-fetch', parseInt(count))

return new Response(
JSON.stringify({ topic, callback, id: webhookId }),
{ status: 200 }
)
if (messages && messages.length > 0) {
return new Response(
JSON.stringify({
api,
topic,
messages: messages.map((message) => ({
message: message.value,
offset: message.offset,
})),
user: request.ctx.user,
}),
{ status: 200 }
)
} else {
return json({ api: request.api, error: 'No messages available', user: request.ctx.user }, { status: 404 })
}
} catch (error) {
return new Response(JSON.stringify({ a: error.message }), { status: 500 })
return json({ api: request.api, error: error.message, user: request.ctx.user }, { status: 500 })
}
})

addEventListener("fetch", (event) => {
event.respondWith(router.handle(event.request));
addEventListener('fetch', (event) => {
event.respondWith(router.all('*', withCtx).handle(event.request, event.env))
})

0 comments on commit d5462ff

Please sign in to comment.