Skip to content

kafka_consumer

main.management.commands.kafka_consumer

Django management command to populate Kafka messages into application database.

Classes

Command

Bases: BaseCommand

Consumes messages from Kafka and stores them in the database.

Functions
add_arguments(parser)

Add commandline options.

Source code in main/management/commands/kafka_consumer.py
def add_arguments(self, parser: ArgumentParser) -> None:
    """Add commandline options."""
    parser.add_argument("--debug", action="store_true")
handle(debug=False, **kwargs)

Command business logic.

Source code in main/management/commands/kafka_consumer.py
def handle(  # type: ignore[explicit-any]
    self,
    debug: bool = False,
    **kwargs: Any,
) -> None:
    """Command business logic."""
    consumer = KafkaConsumer(bootstrap_servers=[settings.KAFKA_ADDRESS])
    consumer.subscribe(pattern=f"({'|'.join(settings.KAFKA_TOPIC_REGEX.values())})")
    # TODO: determine why the below doesn't work
    # consumer.subscribe(pattern="control.no_session.process_manager")

    self.stdout.write("Listening for messages from Kafka.")
    while True:
        for topic, messages in consumer.poll(timeout_ms=500).items():
            message_records = []

            process_message = (
                from_ers_message
                if topic.topic.startswith("ers")
                else from_kafka_message
            )

            for message in messages:
                if debug:
                    self.stdout.write(f"Message received: {message}")
                    self.stdout.flush()

                message_records.append(process_message(message))

            if message_records:
                DruncMessage.objects.bulk_create(message_records)

        # Remove expired messages from the database.
        message_timeout = timedelta(seconds=settings.MESSAGE_EXPIRE_SECS)
        expire_time = datetime.now(tz=timezone.utc) - message_timeout
        query = DruncMessage.objects.filter(timestamp__lt=expire_time)
        if query.count():
            if debug:
                self.stdout.write(
                    f"Deleting {query.count()} messages older than {expire_time}."
                )
            query.delete()

Functions

from_ers_message(message)

Process a ERS style of message.

Parameters:

Name Type Description Default
message Any

Message to be processed.

required
Return

A DruncMessage object to be ingested by the database.

Source code in main/management/commands/kafka_consumer.py
def from_ers_message(message: Any) -> DruncMessage:  # type: ignore [explicit-any]
    """Process a ERS style of message.

    Args:
        message: Message to be processed.

    Return:
        A DruncMessage object to be ingested by the database.
    """
    # Convert Kafka timestamp (milliseconds) to datetime (seconds).
    time = datetime.fromtimestamp(message.timestamp / 1e3, tz=timezone.utc)

    ic = IssueChain()
    ic.ParseFromString(message.value)
    return DruncMessage(
        topic=message.topic,
        timestamp=time,
        message=ic.final.message,
        severity=ic.final.severity.upper() or "INFO",
    )

from_kafka_message(message)

Process a Kafka style of message.

Parameters:

Name Type Description Default
message Any

Message to be processed.

required
Return

A DruncMessage object to be ingested by the database.

Source code in main/management/commands/kafka_consumer.py
def from_kafka_message(message: Any) -> DruncMessage:  # type: ignore [explicit-any]
    """Process a Kafka style of message.

    Args:
        message: Message to be processed.

    Return:
        A DruncMessage object to be ingested by the database.
    """
    # Convert Kafka timestamp (milliseconds) to datetime (seconds).
    time = datetime.fromtimestamp(message.timestamp / 1e3, tz=timezone.utc)

    bm = BroadcastMessage()
    bm.ParseFromString(message.value)
    return DruncMessage(
        topic=message.topic,
        timestamp=time,
        message=bm.data.value.decode("utf-8"),
        severity=BROADCAST_TYPE_SEVERITY.get(bm.type, "INFO"),
    )