Class StreamsRegistry
- java.lang.Object
-
- io.factorhouse.kpow.StreamsRegistry
-
- All Implemented Interfaces:
AutoCloseable
public class StreamsRegistry extends Object implements AutoCloseable
TheStreamsRegistryclass serves as a centralized registry for one or more Kafka Streams applications. Consumers use this class to register their Kafka Streams applications, enabling integration with Kpow's advanced Kafka Streams features.A single instance of
StreamsRegistrycan manage multiple Kafka Streams applications. Each registered application is uniquely identified and can be individually managed (e.g., unregistered) through theStreamsRegistry.StreamsAgentreturned upon registration.All registered Kafka Streams applications will be periodically observed, and their metrics will be collected and sent to Kpow's internal Kafka topic. This enables real-time monitoring and insights into the performance and health of your applications.
This class implementsAutoCloseablewhich deregisters and stops telemetry for all registered Kafka Streams applications.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static classStreamsRegistry.StreamsAgentRepresents an agent for a registered Kafka Streams application.
-
Constructor Summary
Constructors Constructor Description StreamsRegistry(Properties props)Constructs aStreamsRegistryinstance using the specified Kafka connection properties and the defaultMetricFilter.StreamsRegistry(Properties props, MetricFilter metricsFilter)Constructs aStreamsRegistryinstance using the specified Kafka properties and metrics filter.
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description voidclose()static PropertiesfilterProperties(Properties props)Filters aPropertiesobject to retain only allowed Kafka properties.StreamsRegistry.StreamsAgentregister(org.apache.kafka.streams.KafkaStreams streams, org.apache.kafka.streams.Topology topology, KeyStrategy keyStrategy)Registers a Kafka Streams application with the registry for monitoring.voidunregister(StreamsRegistry.StreamsAgent streamsAgent)Unregisters a previously registered Kafka Streams application from the registry.
-
-
-
Constructor Detail
-
StreamsRegistry
public StreamsRegistry(Properties props, MetricFilter metricsFilter)
Constructs aStreamsRegistryinstance using the specified Kafka properties and metrics filter.This constructor initializes the registry, allowing Kafka Streams applications to be registered for monitoring through Kpow's user interface and API. The provided
Propertiesobject is used to configure the underlying Kafka producer, and theMetricFilterdetermines which metrics are collected and sent to Kpow's internal Kafka topic.Important: The Kafka producer properties provided in
propsmust match the connection details of Kpow's primary cluster, where Kpow's internal topic resides.- Parameters:
props- thePropertiesobject containing Kafka configuration. Must include essential properties likebootstrap.servers.metricsFilter- theMetricFilterto customize which metrics are reported. UseMetricFilter.defaultMetricFilter()for default behavior.- Throws:
IllegalArgumentException- if the providedpropsare invalid or incomplete.
-
StreamsRegistry
public StreamsRegistry(Properties props)
Constructs aStreamsRegistryinstance using the specified Kafka connection properties and the defaultMetricFilter.This constructor simplifies the initialization process by applying the default
MetricFilter, which collects and reports all relevant metrics to the internal Kpow Kafka topic. The providedPropertiesobject is used to configure the underlying Kafka producer.Important: The Kafka producer properties provided in
propsmust match the connection details of Kpow's primary cluster, where Kpow's internal topic resides.- Parameters:
props- thePropertiesobject containing Kafka configuration. Must include essential properties likebootstrap.servers.- Throws:
IllegalArgumentException- if the providedpropsare invalid or incomplete.- See Also:
MetricFilter.defaultMetricFilter()
-
-
Method Detail
-
filterProperties
public static Properties filterProperties(Properties props)
Filters aPropertiesobject to retain only allowed Kafka properties. Additional default values for compression and idempotence properties are set if not provided.- Parameters:
props- the inputPropertiescontaining Kafka configuration.- Returns:
- a filtered
Propertiesobject containing only the allowed properties.
-
register
public StreamsRegistry.StreamsAgent register(org.apache.kafka.streams.KafkaStreams streams, org.apache.kafka.streams.Topology topology, KeyStrategy keyStrategy)
Registers a Kafka Streams application with the registry for monitoring.Once registered, the Kafka Streams application will be periodically observed, and its metrics will be collected and sent to an internal Kpow Kafka topic. These metrics enable real-time insights into the application's performance, resource usage, and health from within Kpow's user interface and API.
The registration process associates the application with a unique identifier encapsulated in a
StreamsRegistry.StreamsAgent. This identifier can later be used to unregister the application when it is no longer needed.- Parameters:
streams- theKafkaStreamsinstance representing the application to be registered.topology- theTopologyof the Kafka Streams application, which defines its processing logic.keyStrategy- theKeyStrategydefining the keying mechanism for metrics data written to Kpow's internal Kafka topic.- Returns:
- a
StreamsRegistry.StreamsAgentrepresenting the registered application, ornullif registration fails. - Throws:
IllegalArgumentException- if any of the provided parameters arenull.- See Also:
unregister(StreamsAgent)
-
unregister
public void unregister(StreamsRegistry.StreamsAgent streamsAgent)
Unregisters a previously registered Kafka Streams application from the registry.Unregistering a Kafka Streams application removes it from active monitoring. After unregistration, no further metrics or observations will be collected or reported for the application. This method is typically used when an application is being stopped or removed.
The unregistration process requires a
StreamsRegistry.StreamsAgent, which is provided when the application is initially registered using theregister(KafkaStreams, Topology, KeyStrategy)method. EachStreamsAgentcontains a unique identifier corresponding to the registered application.Note: If the provided
StreamsAgentisnull, or if it does not correspond to a currently registered application, this method will have no effect. It is the caller's responsibility to ensure that the providedStreamsAgentis valid and matches a previously registered application.- Parameters:
streamsAgent- theStreamsRegistry.StreamsAgentrepresenting the application to be unregistered. Must not benull.- Throws:
IllegalArgumentException- if theStreamsAgentis invalid or does not correspond to a currently registered application.- See Also:
register(KafkaStreams, Topology, KeyStrategy)
-
close
public void close()
- Specified by:
closein interfaceAutoCloseable
-
-