Class StreamsRegistry

  • All Implemented Interfaces:
    AutoCloseable

    public class StreamsRegistry
    extends Object
    implements AutoCloseable
    The StreamsRegistry class serves as a centralized registry for one or more Kafka Streams applications. Consumers use this class to register their Kafka Streams applications, enabling integration with Kpow's advanced Kafka Streams features.

    A single instance of StreamsRegistry can manage multiple Kafka Streams applications. Each registered application is uniquely identified and can be individually managed (e.g., unregistered) through the StreamsRegistry.StreamsAgent returned upon registration.

    All registered Kafka Streams applications will be periodically observed, and their metrics will be collected and sent to Kpow's internal Kafka topic. This enables real-time monitoring and insights into the performance and health of your applications.

    This class implements AutoCloseable which deregisters and stops telemetry for all registered Kafka Streams applications.
    • Constructor Detail

      • StreamsRegistry

        public StreamsRegistry​(Properties props,
                               MetricFilter metricsFilter)
        Constructs a StreamsRegistry instance using the specified Kafka properties and metrics filter.

        This constructor initializes the registry, allowing Kafka Streams applications to be registered for monitoring through Kpow's user interface and API. The provided Properties object is used to configure the underlying Kafka producer, and the MetricFilter determines which metrics are collected and sent to Kpow's internal Kafka topic.

        Important: The Kafka producer properties provided in props must match the connection details of Kpow's primary cluster, where Kpow's internal topic resides.

        Parameters:
        props - the Properties object containing Kafka configuration. Must include essential properties like bootstrap.servers.
        metricsFilter - the MetricFilter to customize which metrics are reported. Use MetricFilter.defaultMetricFilter() for default behavior.
        Throws:
        IllegalArgumentException - if the provided props are invalid or incomplete.
      • StreamsRegistry

        public StreamsRegistry​(Properties props)
        Constructs a StreamsRegistry instance using the specified Kafka connection properties and the default MetricFilter.

        This constructor simplifies the initialization process by applying the default MetricFilter, which collects and reports all relevant metrics to the internal Kpow Kafka topic. The provided Properties object is used to configure the underlying Kafka producer.

        Important: The Kafka producer properties provided in props must match the connection details of Kpow's primary cluster, where Kpow's internal topic resides.

        Parameters:
        props - the Properties object containing Kafka configuration. Must include essential properties like bootstrap.servers.
        Throws:
        IllegalArgumentException - if the provided props are invalid or incomplete.
        See Also:
        MetricFilter.defaultMetricFilter()
    • Method Detail

      • filterProperties

        public static Properties filterProperties​(Properties props)
        Filters a Properties object to retain only allowed Kafka properties. Additional default values for compression and idempotence properties are set if not provided.
        Parameters:
        props - the input Properties containing Kafka configuration.
        Returns:
        a filtered Properties object containing only the allowed properties.
      • register

        public StreamsRegistry.StreamsAgent register​(org.apache.kafka.streams.KafkaStreams streams,
                                                     org.apache.kafka.streams.Topology topology,
                                                     KeyStrategy keyStrategy)
        Registers a Kafka Streams application with the registry for monitoring.

        Once registered, the Kafka Streams application will be periodically observed, and its metrics will be collected and sent to an internal Kpow Kafka topic. These metrics enable real-time insights into the application's performance, resource usage, and health from within Kpow's user interface and API.

        The registration process associates the application with a unique identifier encapsulated in a StreamsRegistry.StreamsAgent. This identifier can later be used to unregister the application when it is no longer needed.

        Parameters:
        streams - the KafkaStreams instance representing the application to be registered.
        topology - the Topology of the Kafka Streams application, which defines its processing logic.
        keyStrategy - the KeyStrategy defining the keying mechanism for metrics data written to Kpow's internal Kafka topic.
        Returns:
        a StreamsRegistry.StreamsAgent representing the registered application, or null if registration fails.
        Throws:
        IllegalArgumentException - if any of the provided parameters are null.
        See Also:
        unregister(StreamsAgent)
      • unregister

        public void unregister​(StreamsRegistry.StreamsAgent streamsAgent)
        Unregisters a previously registered Kafka Streams application from the registry.

        Unregistering a Kafka Streams application removes it from active monitoring. After unregistration, no further metrics or observations will be collected or reported for the application. This method is typically used when an application is being stopped or removed.

        The unregistration process requires a StreamsRegistry.StreamsAgent, which is provided when the application is initially registered using the register(KafkaStreams, Topology, KeyStrategy) method. Each StreamsAgent contains a unique identifier corresponding to the registered application.

        Note: If the provided StreamsAgent is null, or if it does not correspond to a currently registered application, this method will have no effect. It is the caller's responsibility to ensure that the provided StreamsAgent is valid and matches a previously registered application.

        Parameters:
        streamsAgent - the StreamsRegistry.StreamsAgent representing the application to be unregistered. Must not be null.
        Throws:
        IllegalArgumentException - if the StreamsAgent is invalid or does not correspond to a currently registered application.
        See Also:
        register(KafkaStreams, Topology, KeyStrategy)