Skip to content

Commit

Permalink
Merge pull request #125 from lpsinger/heartbeat
Browse files Browse the repository at this point in the history
Add heartbeat topic, `gcn.heartbeat`
  • Loading branch information
dakota002 authored Jul 25, 2024
2 parents 95fb5f5 + 7f773f3 commit 388232b
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 1 deletion.
7 changes: 6 additions & 1 deletion gcn_classic_to_kafka/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# SPDX-License-Identifier: Apache-2.0
#
"""Command line interface."""

import asyncio
import logging
import urllib
Expand All @@ -17,6 +18,7 @@
import prometheus_client

from .socket import client_connected
from . import heartbeat
from . import metrics

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -94,7 +96,10 @@ async def serve():
async with server:
await server.serve_forever()

async def run():
await asyncio.gather(asyncio.gather(heartbeat.run(producer), serve()))

# Exit cleanly on SIGTERM
signal.signal(signal.SIGTERM, signal_handler)

asyncio.run(serve())
asyncio.run(run())
39 changes: 39 additions & 0 deletions gcn_classic_to_kafka/heartbeat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#
# Copyright © 2023 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# SPDX-License-Identifier: Apache-2.0
#
import asyncio
import datetime
import json
import logging

import confluent_kafka

log = logging.getLogger(__name__)

TIMEOUT = 1
TOPIC = "gcn.heartbeat"


async def run(producer: confluent_kafka.Producer):
"""Produce a heartbeat message once per second."""
log.info("Producing heartbeats every %d second(s) on topic %s", TIMEOUT, TOPIC)
while True:
producer.produce(
TOPIC,
json.dumps(
{
"$schema": "https://gcn.nasa.gov/docs/schema/v4.1.0/gcn/notices/core/Alert.schema.json",
"alert_datetime": datetime.datetime.now(datetime.UTC).isoformat(),
}
),
)

# Wait for any outstanding messages to be delivered and delivery
# report callbacks to be triggered.
producer.poll(0)

await asyncio.sleep(TIMEOUT)

0 comments on commit 388232b

Please sign in to comment.