The Adelia subscriber (or consumer) is fully managed by the AMBSS. It handles its creation, starting, stopping and destruction.
A subscriber can be described either statically in a YAMLsubscribersConf.yml configuration file read when starting the AMBSS, or dynamically via the use of REST API.
Principles
An Adelia subscriber has two components:
a native subscriber: This is created using a Java library enabling consumer clients compatible with a specific broker to be managed. Its function is to connect to the broker, subscribe to topics and consume received messages.
an EADELIA program: This is called by the native subscriber each time a message is received. The program handles the message's payload as required and, according to the broker protocol and handling state, positively or negatively acknowledges the received message.
When starting the AMBSS, if the subscribersConf.yml YAML file exists, the description of each Adelia subscriber is loaded and the subscriber is created then started. The subscriber starts to handle messages if they are available.
An Adelia subscriber contains a certain amount of information:
global subscriber information (present in all subscribers),
information specific to the native underlying subscriber (topic connection and subscription information): this information varies according to the Java library used and the target broker.
However a subscriber creation is expressed (in YAML language via the subscribersConf.yml file or in JSON language via the REST API), the description information is the same.
Information relating to the associated EADELIA program.
objectFileName
Alphanumeric
Yes
Name of the object file produced by the 3GL generation of the associated EADELIA program.
javaPackage
Alphanumeric
No
Name of the Java package defined for the 3GL generation of the associated EADELIA program.
parameters
List of alphanumerics
No
List of values of parameters passed to the associated EADELIA program during execution. The order of the values must follow the parameter declaration order in the EADELIA program. The supported Adelia parameter types are ALPHA / NUM_BIN_2 / NUM_BIN_4 / NUM_BIN_8 / NUM_E / NUM_P / BOOL / DATE / TIME / TIMESTAMP. The format of the values according to type is as follows:
Type
Format
ALPHA
Alphanumeric string
NUM_BIN_2 / NUM_BIN_4 / NUM_BIN_8 / NUM_E / NUM_P
Numeric (decimal separator '.' without thousands separator)
Example: -1234.56
BOOL
"True" or "false" string
DATE
Alphanumeric string (text in quotation marks) in ISO date format "YYYY-MM-DD".
TIME
Alphanumeric string (text in quotation marks) in ISO time format "HH.MM.SS"
TIMESTAMP
Alphanumeric string (text in quotation marks) in "YYYY-MM-DD-HH.MM.SS.NNNNNN" (NNNNNN for nanoseconds) format
customObjectMapper
Alphanumeric
No
Identifier of the Java Bean defined in the beans-context.xml file used to configure the properties for converting a JSON object to an Adelia class instance
brokerConfig
Object
Yes
Information relating to the native subscriber
factory
Object
Yes
Information relating to the Java factory responsible for creating the Adelia subscriber
id
Alphanumeric
Yes
Full name of the Java factory class
parameters
Object
No
Hash table (<alphanumeric key, value> pairs used as parameters when creating the factory)
brokerConnection
Object
No
Specific configuration of parameters for connecting the native subscriber to the target broker
subscribeTopic
Object
Yes
Specific configuration of parameters for subscribing the native subscriber to the target broker
The config object centralizes the general subscriber configuration. It contains information specific to the Adelia execution context as well as information specific to the native subscriber.
In an Adelia subscriber, the information specific to the target broker is (in XPath notation):
config/brokerConfig/factory/id (and config/brokerConfig/factory/parameters if the factory accepts parameters),
config/brokerConfig/brokerConnection,
config/brokerConfig/subscribeTopic.
Example of Adelia RabbitMQ AMQP subscriber in YAML
This depends on the target broker and the factory used. The AMBSS is supplied with a number of factories which support the following brokers or protocols:
List of Kafka broker server URIs used for connection.
fetch.min.bytes
Integer
No
Minimum amount of data the server must return for a fetch request. Default value: 1.
group.id
Alphanumeric
No
Unique string identifying the consumer group this subscriber belongs to.
heartbeat.interval.ms
Integer
No
Interval in milliseconds between heartbeat checks to the consumer group coordinator to indicate that a subscriber is active and connected. Default value: 3000.
max.partition.fetch.bytes
Integer
No
Maximum amount of data per partition that the server will return to the subscriber (in bytes). Default value: 1048576.
session.timeout.ms
Integer
No
Maximum time in milliseconds during which a consumer in a consumer group can be out of contact with a broker before being considered idle and before rebalancing is initiated among the active consumers in the group. Default value: 45000.
ssl.key.password
Alphanumeric
No
Password of the private key in the key storage file or the PEM key specified in "ssl.keystore.key". This is required for clients only if bidirectional authentication is configured. This can be entered unencrypted or encrypted.
ssl.keystore.certificate.chain
Alphanumeric
No
Certificate string in the format specified by "ssl.keystore.type". The default SSL engine factory only handles the PEM format with a list of X.509 certificates.
ssl.keystore.key
Alphanumeric
No
Private key in the format specified by "ssl.keystore.type". The default SSL engine factory only handles the PEM format with PKCS#8 keys. If the key is encrypted, the key password must be specified using "ssl.key.password".
ssl.keystore.location
Alphanumeric
No
Keystore file location. This is optional for the client and can be used for bidirectional authentication for the client.
ssl.keystore.password
Alphanumeric
No
Store password for the keystore file. This is optional for the client and only necessary if "ssl.keystore.location" is configured. The keystore password is not supported for the PEM format.This can be entered unencrypted or encrypted.
ssl.truststore.certificates
Alphanumeric
No
Trusted certificates in the format specified by "ssl.truststore.type". The default SSL engine factory only supports the PEM format with X.509 certificates.
ssl.truststore.location
Alphanumeric
No
Truststore file location.
ssl.truststore.password
Alphanumeric
No
Truststore file password. If no password is defined, the configured truststore file will always be used, but integrity check is disabled. The truststore password is not supported for the PEM format. This can be entered unencrypted or encrypted.
allow.auto.create.topics
Boolean
No
Used to allow automatic topic creation on the broker when subscribing to a topic. Default value: "true".
auto.offset.reset
Alphanumeric. Possible values: "latest", "earliest" or "none".
No
Specifies what to do when there is no initial offset in Kafka or if the current offset no longer exists on the server (e.g. because the data has been deleted). Default value: "latest".
client.dns.lookup
Alphanumeric. Possible values: "use_all_dns_ips" or "resolve_canonical_bootstrap_servers_only"
No
Controls how the subscriber uses DNS lookups. Default value: "use_all_dns_ips".
connections.max.idle.ms
Integer
No
Closes idle connections after the number of milliseconds specified by this configuration. Default value: 540000 (9 Minutes).
default.api.timeout.ms
Integer
No
Wait time (in milliseconds) for client APIs. This configuration is used as the default timeout for all client operations that do not specify a timeout parameter. Default value: 60000 (1 minute).
enable.auto.commit
Boolean
No
If true, the consumer's offset will be periodically committed in the background. Default value: "false".
exclude.internal.topics
Boolean
No
Indicates whether internal topics matching a subscription pattern should be excluded from the subscription. It is always possible to explicitly subscribe to an internal topic. Default value: "true".
fetch.max.bytes
Integer
No
Maximum amount of data (in bytes) the server should return for a fetch request. Default value: 52428800.
group.instance.id
Alphanumeric
No
Unique identifier of the consumer instance provided by the end user.
isolation.level
Alphanumeric. Possible values: "read_committed" or "read_uncommitted"
No
Controls how to read messages written transactionally. Default value: "read_uncommitted".
max.poll.interval.ms
Integer
No
Maximum delay between invocations of poll() when using consumer group management (in milliseconds). Default value: 300000 (5 minutes).
max.poll.records
Integer
No
Maximum number of records returned in a single call to poll(). Default value: 500.
partition.assignment.strategy
List of alphanumerics
No
List of class names or class types, ordered by preference, of supported partition assignment strategies that the client will use to distribute partition ownership amongst consumer instances when group management is used. Default value: [ 'org.apache.kafka.clients.consumer.RangeAssignor' , 'org.apache.kafka.clients.consumer.CooperativeStickyAssignor' ]
receive.buffer.bytes
Integer
No
Size of the TCP receive buffer (SO_RCVBUF) to use when reading data (in bytes). A value of -1 indicates that the OS's default value is used. Default value: 65536.
request.timeout.ms
Integer
No
Maximum amount of time the client will wait for the response from a request(in milliseconds). Default value: 30000 (30 seconds).
security.protocol
Alphanumeric. Possible values: "PLAINTEXT", "SSL", "SASL_PLAINTEXT" or "SASL_SSL"
No
Protocol used to communicate with brokers. Default value: "PLAINTEXT".
send.buffer.bytes
Integer
No
Size of the TCP send buffer (SO_SNDBUF) to use when sending data. A value of -1 indicates that the OS's default value is used. Default value: 131072.
socket.connection.setup.timeout.max.ms
Integer
No
Maximum amount of time the client will wait for the socket connection to be established (in milliseconds). Default value: 30000 (30 seconds).
socket.connection.setup.timeout.ms
Integer
No
Amount of time the client will wait for the socket connection to be established (in milliseconds). Default value: 10000 (10 seconds).
ssl.enabled.protocols
List of alphanumerics. Possible values: "TLSv1.2" or "TLSv1.3"
No
List of protocols enabled for SSL connections. Default value: ['TLSv1.2'] or ['TLSv1.2' , 'TLSv1.3']with JDK v11 or higher.
ssl.keystore.type
Alphanumeric
No
File format of the keystore file.
ssl.protocol
Alphanumeric. Possible values: "TLSv1.2" or "TLSv1.3"
No
SSL protocol used to generate the SSLContext. Default value: "TLSv1.2" or "TLSv1.3" with JDK v11 or higher.
ssl.provider
Alphanumeric
No
Name of the security provider used for SSL connections.
ssl.truststore.type
Alphanumeric
No
File format of the truststore file. Default value: "JKS".
auto.commit.interval.ms
Integer
No
Frequency in milliseconds with which the consumer offsets are auto-committed to Kafka if enable.auto.commit is set to "true". Taken into account if enable.auto.commit is "true". Default value: 5000 (5 seconds).
check.crcs
Boolean
No
Automatically checks the CRC32 of the messages consumed. Default value: "true".
client.id
Alphanumeric
No
ID string to pass to the server when making requests.
client.rack
Alphanumeric
No
Rack identifier for this client. This can be any string value indicating the physical location of this client. It corresponds to the "broker.rack" broker configuration.
fetch.max.wait.ms
Integer
No
Maximum amount of time (in milliseconds) the server will block before answering the fetch request if there is not sufficient data to immediately satisfy the requirement given by "fetch.min.bytes". Default value: 500.
interceptor.classes
List of alphanumerics
No
List of classes to use as interceptors. Implementing the org.apache.kafka.clients.consumer.ConsumerInterceptor interface allows you to intercept (and possibly mutate) records received by the consumer.
metadata.max.age.ms
Integer
No
Period of time in milliseconds after which we force a metadata refresh even if we have not seen any partition leadership changes to proactively discover any new brokers or partitions. Default value: 300000 (5 minutes).
metric.reporters
List of alphanumerics
No
List of classes to use as metrics reporters.
metrics.num.samples
Integer
No
Number of samples maintained to compute metrics. Default value: 2.
metrics.recording.level
Alphanumeric. Possible values: "INFO", "DEBUG" or "TRACE"
No
Highest recording level for metrics. Default value: "INFO".
metrics.sample.window.ms
Integer
No
Window of time during which a metrics sample is computed. Default value: 30000 (30 seconds).
reconnect.backoff.max.ms
Integer
No
Maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. Default value: 1000 (1 second).
reconnect.backoff.ms
Integer
No
Base amount of time to wait before attempting to reconnect to a given host. Default value: 50.
retry.backoff.ms
Integer
No
Amount of time to wait before attempting to retry a failed request to a given topic partition. Default value: 100.
security.providers
List of alphanumerics
No
List of configurable creator classes each returning a provider implementing security algorithms.
schema.registry.url
Alphanumeric
No
Schema Registry instance URL.
specific.avro.reader
Boolean
No
Tells Kafka/Schema Registry to use a specific Avro type (user type in this case, resulting from the Avro generation of an Adelia class), otherwise Kafka will wait until the GenericRecord is used for the topic. Default value: "false".
Topic pattern to subscribe to. Taken into account if the "topics" and "partitions" properties are not entered.
topics
List of non-empty alphanumerics
No
List of topics to subscribe to. Taken into account if the "pattern" and "partitions" properties are not entered.
partitions
List of topicPartition objects
No
List of information about topics and partitions to subscribe to. Taken into account if the "pattern" and "topics" properties are not entered.
[n]
Object
Yes
Topic and partition to subscribe to.
topic
Non-empty alphanumeric
Yes
Name of the (n index) topic in the list of information about topics and partitions to subscribe to.
partition
Positive integer
Yes
Number of the partition associated with the (n index) topic in the list of information about topics and partitions to subscribe to.
ackMode
Alphanumeric. Possible values: "RECORD", "BATCH", "TIME", "COUNT", "COUNT_TIME", "MANUAL" or "MANUAL_IMMEDIATE"
No
Specifies the message acquisition and acknowledgment mode.
RECORD: automatically acknowledges the message (commits the message offset) at the end of its handling,
BATCH: automatically acknowledges the messages (commits the message offsets) when all those that were returned by a poll() call have been handled (batch processed),
TIME: automatically acknowledges the messages (commits the message offsets) when all those that were returned by a poll() call have been handled (batch processed) and a time (defined by the ackTime property) since the last commit has been exceeded,
COUNT: automatically acknowledges the messages (commits the message offsets) when all those that were returned by a poll() call have been handled (batch processed) and a number of handled messages (defined by the ackCount property) since the last commit has been exceeded,
COUNT_TIME: similar to TIME and COUNT, but the acknowledgment is performed if one or the other condition is true,
MANUAL: manual acknowledgment of the message(s) by the user (acknowledgment is asynchronous),
MANUAL_IMMEDIATE: manual acknowledgment of the message(s) by the user (acknowledgment is synchronous).
Default value: BATCH.
ackTime
Integer
No
Time in milliseconds after which messages pending acknowledgment are acknowledged. Taken into account if the "ackMode" property is "TIME" or "COUNT_TIME". The value must be higher than the value of the "pollTimeout" property. Default value: 5000.
ackCount
Integer
No
Minimum number of messages pending acknowledgment after which their acknowledgment is triggered. Taken into account if the "ackMode" property is "COUNT" or "COUNT_TIME". Default value: 1.
typeMsgHandling
Alphanumeric. Possible values: "BATCH" or "SINGLE"
No
Message acknowledgment mode (by batch or unit). Taken into account if "ackMode" is "MANUAL" or "MANUAL_IMMEDIATE". Default value: SINGLE.
pollTimeout
Integer
No
Maximum wait time (in milliseconds) to acknowledge messages - timeout value passed when calling the poll(). Default value: 5000.
commitLogLevel
Alphanumeric. Possible values: "DEBUG", "ERROR, "FATAL, "INFO, "TRACE or "WARN"
Native subscriber start mode if topics to subscribe to do not exist in the broker. If "true", the subscriber will not start. Only taken into account if the "topics" or "partitions" properties are entered. Default value: "false".
onlyLogRecordMetadata
Boolean
No
Message log level. If "false", the whole message is logged in the produced logs instead of just the <topic>-<partition>@<offset> logger. Default value: "false".
stopImmediate
Boolean
No
When the subscriber is stopped, allows handling to be stopped after the current message (true) instead of after handling all the messages returned by the previous call to poll() (false). Default value: "false".
Specifies if the client will automatically try to reconnect to the server if the connection is lost. Default value: "false".
reconnectCloseTimeout
Positive integer
No
Successful reconnection attempt wait time before definitively closing the subscriber. Taken into account if "automaticReconnect" is "true". Default value: no wait time.
cleanSession
Boolean
No
Specifies if the client and server need to remember the client's state when reconnecting. Default value: "true". If the value is "false", "subscribeTopic/clientId" must be set.
connectionTimeout
Positive integer
No
Connection timeout value. This value, measured in seconds, defines the maximum time the client will wait for network connection to the MQTT server to be established. Default value: 30.
keepAliveInterval
Positive integer
No
"Keep alive" interval. This value, measured in seconds, defines the maximum time interval between sent or received messages. This enables the client to detect if the server is no longer available, without waiting for TCP/IP timeout. Default value: 60.
maxInflight
Positive integer
No
Sets the maximum number of messages the broker can send without receiving an acknowledgment. Default value: 10.
mqttVersion
Alphanumeric. Possible values: "3.1.1" or "3.1"
No
Sets which version of the MQTT v3 protocol to use (3.1 or 3.1.1). Default value: 3.1.1.
serverURIs
List of alphanumerics
No
List of one or more MQTT server URIs the client can connect to. Default value: [ 'tcp://localhost:1883' ]. If the port is not specified in the URI, it will take the 1883 value for 'tcp://' and 8883 for 'ssl://
userName
Alphanumeric
No
Username to use to connect. This can be entered unencrypted or encrypted.
password
Alphanumeric
No
Password to use to connect. This can be entered unencrypted or encrypted.
will
Object
No
"Last Will and Testament" (LWT) message for connection.
topic
Non-empty alphanumeric
Yes
LWT topic.
payload
Alphanumeric
Yes
LWT message payload.
qos
Integer 0 to 2
No
Quality of service level for publishing LWT messages. Default value: 0.
retained
Boolean
No
Specifies if the LWT message needs to be retained. Default value: "false".
sslProperties
Object
No
SSL connection properties (the connection URL must start with "ssl:" instead of "tcp:").
com.ibm.ssl.protocol
Alphanumeric. Possible values: "SSL", "SSLv2", "SSLv3', "SSL_TLS", "SSL_TLSv2", "TLS", "TLSv1", or "TLSv1.2".
No
Protocol type
com.ibm.ssl.contextProvider
Alphanumeric
No
Underlying JSSE provider For example: "IBMJSSE2" or "SunJSSE".
com.ibm.ssl.keyStore
Alphanumeric
No
Name of file containing the KeyStore object you want the KeyManager to use. For example: /monrep/etc/key.p12
com.ibm.ssl.keyStorePassword
Alphanumeric
No
Password for the KeyStore object you want the KeyManager to use. This can be entered unencrypted or encrypted.
com.ibm.ssl.keyStoreType
Alphanumeric
No
Keystore type, for example: "PKCS12", "JKS" or "JCEKS".
com.ibm.ssl.keyStoreProvider
Alphanumeric
No
Keystore provider, for example: "IBMJCE" or "IBMJCEFIPS".
com.ibm.ssl.keyManager
Alphanumeric
No
Algorithm that will be used to instantiate a KeyManagerFactory object instead of using the default algorithm available in the platform. Examples of values: "IbmX509" or "IBMJ9X509"
com.ibm.ssl.trustStore
Alphanumeric
No
Name of file containing the KeyStore object you want the TrustManager to use.
com.ibm.ssl.trustStorePassword
Alphanumeric
No
Password for the TrustStore you want the TrustManager to use. This can be entered unencrypted or encrypted.
com.ibm.ssl.trustStoreType
Alphanumeric
No
KeyStore object type you want the default TrustManager to use. Same possible values as "keyStoreType".
com.ibm.ssl.trustStoreProvider
Alphanumeric
No
Truststore provider, for example: "IBMJCE" or "IBMJCEFIPS".
com.ibm.ssl.trustManager
Alphanumeric
No
Algorithm that will be used to instantiate a TrustManagerFactory object instead of using the default algorithm available in the platform. Examples of values: "PKIX" or "IBMJ9X509"
com.ibm.ssl.enabledCipherSuites
Alphanumeric
No
List of enabled encryptions. The values depend on the provider, for example: SSL_RSA_WITH_AES_128_CBC_SHA;SSL_RSA_WITH_3DES_EDE_CBC_SHA
com.ibm.ssl.clientAuthentication
Alphanumeric
No
Determines if SSL client authentication is required. Default value: "false".
Message storage directory. Taken into account when "persistence" is "file". Default value: JVM "user.dir".
manualAcks
Boolean
No
Specifies if message acknowledgment is automatic or manual. Default value: "false".
timeToWait
Integer
No
Maximum time to wait for an action with the broker to finish. A value of 0 or -1 indicates "no maximum wait time" (wait until the end of the action). Default value: -1.
topics
List of topic objects
Yes
List of information about topics to subscribe to.
[n]
Object
Yes
Topic to subscribe to.
topicFilter
Non-empty alphanumeric
Yes
Name of the (n index) topic in the list of information about topics to subscribe to.
qos
Integer 0 to 2
No
Quality of service level for receiving messages from the (n index) topic in the list of information about topics to subscribe to. Default value: 1.
Specifies if the client will automatically try to reconnect to the server if the connection is lost. Default value: "false".
automaticReconnectMinDelay
Integer
No
Minimum number of seconds to wait before attempting to reconnect automatically. Default value: 1.
automaticReconnectMaxDelay
Integer
No
Maximum number of seconds to wait before attempting to reconnect automatically. Default value: 120
reconnectCloseTimeout
Positive integer
No
Successful reconnection attempt wait time before definitively closing the subscriber. Taken into account if automaticReconnect is "true". Default value: "no wait time".
cleanStart
Boolean
No
Specifies if the client and server need to remember the session state when restarting and reconnecting. Default value: "true".
connectionTimeout
Positive integer
No
Connection timeout value. This value, measured in seconds, defines the maximum time the client will wait for network connection to the MQTT server to be established. Default value: 30.
httpsHostnameVerificationEnabled
Boolean
No
Enables host verification during an HTTPS connection. Default value: "true".
keepAliveInterval
Positive integer
No
"Keep alive" interval. This value, measured in seconds, defines the maximum time interval between sent or received messages. This enables the client to detect if the server is no longer available, without waiting for TCP/IP timeout. Default value: 60.
maximumPacketSize
Integer between 1 and 2684354656
No
Maximum packet size. Default value: "no limit".
maxReconnectDelay
Integer
No
Maximum wait time (in milliseconds) between reconnect attempts. Default value: 128000.
receiveMaximum
Integer between 1 and 65535
No
Maximum number of received messages This value represents the maximum number of QoS 1 and QoS 2 messages the client is ready to process simultaneously.
There is no mechanism for limiting the number of publications in QoS 0 that the broker can send. Default value: 65535.
requestProblemInfo
Boolean
No
Information request indicator relating to problems encountered in the communication protocol. Default value: "true".
requestResponseInfo
Boolean
No
Request response information indicator. Default value: "false".
serverURIs
List of alphanumerics
No
List of one or more MQTT server URIs the client can connect to. Default value: [ ' tcp://localhost:1883 ' ]. If the port is not specified in the URI, it will take the 1883 value for 'tcp://' and 8883 for 'ssl://'
sessionExpiryInterval
Integer
No
Session expiry interval. This value, measured in seconds, defines the maximum time the broker will maintain the session once the client has disconnected. Clients should only connect with a long session expiry interval if they intend to connect to the server later. Default value: never.
topicAliasMaximum
Integer between 0 and 65535
No
Maximum topic aliases. This value, if present, represents the highest value the client will accept as a topic alias sent by the server. Default value: 0.
userName
Alphanumeric
No
Username to use to connect. This can be entered unencrypted or encrypted.
password
Alphanumeric
No
Password to use to connect. This can be entered unencrypted or encrypted.
userProperties
List of lists with two alphanumeric elements
No
User properties. A user property is a pair of UTF-8 strings. The same name may appear several times.
useSubscriptionIdentifiers
Boolean
No
Specifies if subscription identifiers need to be automatically allocated when subscribing to a topic. Default value: "true".
will
Object
No
"Last Will and Testament" (LWT) message for connection.
topic
Non-empty alphanumeric
Yes
LWT topic.
payload
Alphanumeric
Yes
LWT message payload.
qos
Integer 0 to 2
No
Quality of service level for publishing LWT messages. Default value: 0.
retained
Boolean
No
Specifies if the LWT message needs to be retained. Default value: "false".
sslProperties
Object
No
SSL connection properties (the connection URL must start with "ssl:" instead of "tcp:").
com.ibm.ssl.protocol
Alphanumeric. Possible values: "SSL", "SSLv2", "SSLv3", "SSL_TLS", "SSL_TLSv2", "TLS", "TLSv1" or "TLSv1.2"
No
Protocol type
com.ibm.ssl.contextProvider
Alphanumeric
No
Underlying JSSE provider For example: "IBMJSSE2" or "SunJSSE".
com.ibm.ssl.keyStore
Alphanumeric
No
Name of file containing the KeyStore object you want the KeyManager to use. For example: /monrep/etc/key.p12
com.ibm.ssl.keyStorePassword
Alphanumeric
No
Password for the KeyStore object you want the KeyManager to use. This can be entered unencrypted or encrypted.
com.ibm.ssl.keyStoreType
Alphanumeric
No
Keystore type, for example: "PKCS12", "JKS" or "JCEKS".
com.ibm.ssl.keyStoreProvider
Alphanumeric
No
Keystore provider, for example: "IBMJCE" or "IBMJCEFIPS".
com.ibm.ssl.keyManager
Alphanumeric
No
Algorithm that will be used to instantiate a KeyManagerFactory object instead of using the default algorithm available in the platform. Examples of values: "IbmX509" or "IBMJ9X509".
com.ibm.ssl.trustStore
Alphanumeric
No
Name of file containing the KeyStore object you want the TrustManager to use.
com.ibm.ssl.trustStorePassword
Alphanumeric
No
Password for the TrustStore you want the TrustManager to use. This can be entered unencrypted or encrypted.
com.ibm.ssl.trustStoreType
Alphanumeric
No
KeyStore object type you want the default TrustManager to use. Same possible values as "keyStoreType".
com.ibm.ssl.trustStoreProvider
Alphanumeric
No
Truststore provider, for example: "IBMJCE" or "IBMJCEFIPS".
com.ibm.ssl.trustManager
Alphanumeric
No
Algorithm that will be used to instantiate a TrustManagerFactory object instead of using the default algorithm available in the platform. Examples of values: "PKIX" or "IBMJ9X509".
com.ibm.ssl.enabledCipherSuites
Alphanumeric
No
List of enabled encryptions. The values depend on the provider, for example: SSL_RSA_WITH_AES_128_CBC_SHA;SSL_RSA_WITH_3DES_EDE_CBC_SHA
com.ibm.ssl.clientAuthentication
Alphanumeric
No
Determines if SSL client authentication is required. Default value: "false".
Message storage directory. Taken into account when "persistence" is "file". Default value: JVM "user.dir".
manualAcks
Boolean
No
Specifies if message acknowledgment is automatic or manual. Default value: "false".
timeToWait
Integer
No
Maximum time to wait for an action with the broker to finish. A value of 0 or -1 indicates "no maximum wait time" (wait until the end of the action). Default value: -1.
topics
List of topic objects
Yes
List of information about topics to subscribe to.
[n]
Object
Yes
Topic to subscribe to.
topicFilter
Non-empty alphanumeric
Yes
Name of the (n index) topic in the list of information about topics to subscribe to.
qos
Integer 0 to 2
No
Quality of service level for receiving messages from the (n index) topic in the list of information about topics to subscribe to. Default value: 1.
retainAsPublish
Boolean
No
Specifies if the broker retains the "retain" flag when sending a retained message to the subscriber. Default value: "false".
noLocal
Boolean
No
Used to not receive the messages published by the subscriber. Default value: "false".
retainHandling
Integer 0 to 2
No
Specifies if the server sends the retained message to the client when establishing a subscription. Default value: 0.
Prefix for Jetstream topics. Default value: $JS.API.
requestTimeout
Alphanumeric in ISO-8601 duration format
No
Request timeout for Jetstream API calls. Default value: "PT2S" (2 seconds).
natsCon
Object
No
Nats connection parameters.
bufferSize
Integer
No
Initial size of buffers (in bytes) in the connection, mainly for testing. Default value: 64* 1024.
connectionName
Alphanumeric
No
Sets the connection name.
connectionTimeout
Alphanumeric in ISO-8601 duration format
No
Sets the connection attempt timeout. Default value: "PT2S" (2 seconds).
dataPortType
Alphanumeric
No
Sets the Java class to use for this connection data port.
discardMessagesWhenOutgoingQueueFull
Boolean
No
Enables discard messages when the outgoing queue is full. Default value: "false".
inboxPrefix
Alphanumeric
No
Connection's inbox prefix. Default value: _INBOX.
maxControlLine
Integer
No
Maximum length (in bytes) of a control line sent by this connection. Default value: 4096.
maxMessagesInOutgoingQueue
Integer
No
Maximum number of messages in the outgoing queue. Default value: 5000.
maxPingsOut
Integer
No
Maximum number of pings the client can have in flight. Default value: 2.
maxReconnects
Integer
No
Maximum number of reconnect attempts. Value 0 disables reconnects. Value-1 enablesinfinite reconnects. Default value: 60.
noEcho
Boolean
No
Turns off echo. Default value: "false".
noHeaders
Boolean
No
Disables header handling. Default value: "false".
noNoResponders
Boolean
No
Disables assistance with no response. Default value: "false".
noRandomize
Boolean
No
Disables server pool randomization. Default value: "false".
oldRequestStyle
Boolean
No
Enables the old request style which uses a new inbox and new subscriber for each request. Default value: "false".
opentls
Boolean
No
SSL context which accepts any server certificate and has no client certificate. Default value: "false".
pedantic
Boolean
No
Enables pedantic mode for the server in relation to this connection. Default value: "false".
pingInterval
Alphanumeric in ISO-8601 duration format
No
Interval between server ping attempts. Default value: "PT2M" (2 minutes).
reconnectBufferSize
Integer
No
Maximum number of bytes to buffer in the client when trying to reconnect. Default value: 8388608.
reconnectJitter
Integer
No
Wait time between reconnect attempts to the same server. Default value: "PT0.1S" (100 milliseconds).
reconnectJitterTls
Integer
No
Wait time of a tls/secure connection between reconnect attempts to the same server. Default value: "PT1S" (1 second).
reconnectWait
Integer
No
Wait time between reconnect attempts to the same server. Default value: "PT2S" (2 seconds).
requestCleanupInterval
Alphanumeric in ISO-8601 duration format
No
Interval between cleaning passes on outstanding request futures that are cancelled or timeout in the application code. Default value: "PT5S" (5 seconds).
secure
Boolean
No
Options for using the default SSL context if there is one. Default value: "false".
servers
List of alphanumerics
No
List of server URIs to connect to. Default value: [ "nats://localhost:4222" ].
shareConnection
Boolean
No
Share a connection (to the Nats server) for several subscribers. Default value: "false".
If the value is "true", the subscriber to be created will use an existing connection of a running subscriber if this has the same characteristics as the subscriber to be created (all the properties of the two natsCon objects must have the same value). If not, a new connection will be created (using the natsCon object properties). This can be shared with the next subscribers created. If a connection is shared among several subscribers, only the last subscriber stopped closes the connection to the Nats server.
If the value is "false", a new connection to the Nats server is created for each subscriber.
Binds the durable subscriber to the stream. Default value: "false".
pendingMessageLimit
Integer
No
Limits on the maximum number of messages the consumer will retain before starting to delete new messages until the application empties the queue. Default value: 65536.
pendingByteLimit
Integer
No
Limits on the maximum size of messages (in bytes) the consumer will retain before starting to delete new messages until the application empties the queue. Default value: 67108864.
pushMaxWait
Integer
Yes
Maximum wait time to receive a message.
consumerConfig
Object
No
Consumer configuration information.
ackPolicy
Alphanumeric. Possible values: "All", "Explicit" our"None"
No
Message acknowledgment mode. Default value: "Explicit". If subscribeType is "Pull", the only possible value is "Explicit".
ackWait
Alphanumeric in ISO-8601 duration format
No
Acknowledgment wait time.
deliverPolicy
Alphanumeric. Possible values: "All", "ByStartSequence", "ByStartTime", "Last", "LastPerSubject" or "New"
No
Strategy for delivering messages to the consumer. Default value: "All".
description
Alphanumeric
No
Sets a description for this configuration.
filterSubject
Alphanumeric
No
Filter subject.
flowControl
Alphanumeric in ISO-8601 duration format
No
Enables flow control. Valid only if subscribeType is "Push".
idleHeartbeat
Alphanumeric in ISO-8601 duration format
No
Wait time for consumer's idle heartbeats. Valid only if subscribeType is "Push".
maxAckPending
Integer
No
Maximum number of pending acknowledgments. Valid only if subscribeType is "Push" and if ackPolicy is different from "None". Default value: -1.
maxDeliver
Integer
No
Maximum number of message deliveries. Default value: -1.
maxPullWaiting
Integer
No
Maximum pull waiting: number of pulls that can be outstanding on a consumer in "PULL" mode. Pulls received after this number is reached are ignored. Default value: 0.
rateLimit
Integer
No
Rate limit (number of messages per second). Default value: -1.
replayPolicy
Alphanumeric. Possible values: "Instant" or "Original"
No
Defines the message replay strategy. Taken into account only if deliverPolicy is "All", "ByStartSequence" or "ByStartTime". Default value: "Instant".
sampleFrequency
Alphanumeric
No
Sampling frequency as a percentage. For example: 30% or 30 (value must be between 0 and 100).
startSequence
Integer
No
Start sequence for message acquisition in the stream. Taken into account if deliverPolicy is "ByStartSequence".
startTime
Alphanumeric in ISO-8601 duration format
No
Start time for message acquisition in the stream. Taken into account if deliverPolicy is "ByStartTime".
pullSubOpts
Object
No
"PULL" type subscription information.
durable
Alphanumeric
Yes
Durable subscriber name.
stream
Alphanumeric
No
Name of stream containing the topic to subscribe to.
bind
Boolean
No
Binds the durable subscriber to the stream. Default value: "false".
pullType
Alphanumeric. Possible values: "Fetch" or "Iterate"
Yes
Acquisition mode for messages in "PULL" mode.
pullBatchSize
Integer between 1 and 256
Yes
Maximum number of messages retrieved in a batch.
pullMaxWait
Alphanumeric in ISO-8601 duration format
Yes
Maximum wait time for the batch to receive messages.
consumerConfig
Object
No
Consumer configuration information.
ackPolicy
Alphanumeric. Possible values: "All", "Explicit" or "None"
No
Message acknowledgment mode. Default value: "Explicit". If subscribeType is "Pull", the only possible value is "Explicit".
ackWait
Alphanumeric in ISO-8601 duration format
No
Acknowledgment wait time.
deliverPolicy
Alphanumeric. Possible values: "All", "ByStartSequence", "ByStartTime", "Last", "LastPerSubject" or "New"
No
Strategy for delivering messages to the consumer. Default value: "All".
description
Alphanumeric
No
Sets a description for this configuration.
filterSubject
Alphanumeric
No
Filter subject.
flowControl
Alphanumeric in ISO-8601 duration format
No
Enables flow control. Valid only if subscribeType is "Push".
idleHeartbeat
Alphanumeric in ISO-8601 duration format
No
Wait time for consumer's idle heartbeats. Valid only if subscribeType is "Push".
maxAckPending
Integer
No
Maximum number of pending acknowledgments. Valid only if subscribeType is "Push" and if ackPolicy is different from "None". Default value: -1.
maxDeliver
Integer
No
Maximum number of message deliveries. Default value: -1.
maxPullWaiting
Integer
No
Maximum pull waiting: number of pulls that can be outstanding on a consumer in "PULL" mode. Pulls received after this number is reached are ignored. Default value: 0.
rateLimit
Integer
No
Rate limit (number of messages per second). Default value: -1.
replayPolicy
Alphanumeric. Possible values: "Instant" or "Original"
No
Message replay strategy. Taken into account only if deliverPolicy is "All", "ByStartSequence" or "ByStartTime". Default value: "Instant".
sampleFrequency
Alphanumeric
No
Sampling frequency as a percentage. For example: 30% or 30 (value must be between 0 and 100).
startSequence
Integer
No
Start sequence for message acquisition in the stream. Taken into account if deliverPolicy is "ByStartSequence".
startTime
Alphanumeric in ISO-8601 duration format
No
Start time for message acquisition in the stream. Taken into account if deliverPolicy is "ByStartTime".
Defines the subscriber as being the exclusive consumer of the associated queue. Default value: "false".
consumerArgs
Object
No
Consumer properties.
xPriority
Integer
No
Consumer priority in the group. Default value: 0.
queue
Object
Yes
Properties of the queue associated with the consumer.
name
Non-empty alphanumeric
Yes
Queue name.
declarePassive
Boolean
No
Uses an existing queue in the topology ("true" value) or creates a new queue ("false" value). Default value: "false".
durable
Boolean
No
Queue persistence. Default value: "false".
autoDelete
Boolean
No
Queue lifespan. An autoDelete queue is automatically deleted when the last consumer subscribed to the queue unsubscribes. Default value: "true".
exclusive
Boolean
No
Defines queue exclusivity. An exclusive queue is used by a single connection and will be deleted when the connection is closed. Default value: "true".
queueArgs
Object
No
Additional information about the queue behavior.
xMessageTtl
Integer
No
Lifespan of messages in the queue (in milliseconds). Default value: "unlimited".
xExpires
Integer
No
Maximum queue idle time (no associated subscriber) before deletion (in milliseconds). Default value: "unlimited".
xMaxLength
Integer
No
Maximum number of messages contained in the queue (the oldest messages will be deleted from the queue if the limit is reached). Default value: "unlimited".
xMaxLengthBytes
Integer
No
Maximum number of bytes contained in the queue (the oldest messages will be deleted from the queue if the limit is reached). Default value: "unlimited".
xOverflow
Alphanumeric. Possible values: "drop-head", "reject-publish" or "reject-publish-dlx"
No
Behavior of a full queue when it receives a new message. Default value: "drop-head".
xDeadLetterExchange
Alphanumeric
No
Name of the exchange to republish "dead letter" messages to from a queue (messages received by the queue in specific conditions).
xDeadLetterRoutingKey
Alphanumeric
No
Routing key to use for "dead letter" messages.
xMaxPriority
Integer between 1 and 255
No
Maximum level of priority the queue supports in message priority management. Default value: "none" (no message priority management).
xQueueMode
Alphanumeric. Possible values: "default" or "lazy"
No
Backup mode for messages in the queue. Default value: "default".
xQueueMasterLocator
Alphanumeric. Possible values: "min-masters", "client-local" or "random"
No
Master queue location in a distributed server environment.
bind
List of Bind objects
No
List of queue-to-exchange binding information.
[n]
Object
Yes
Queue-to-exchange binding information.
routingKey
Alphanumeric
No
Routing key used to bind the exchange to the queue.
bindArgs
Object
No
Exchange and queue binding properties hash table (<alphanumeric key, value> pairs used as parameters when binding). Example of "x-match" key (for "Headers"-type exchange).
exchange
Object
No
Information on exchange bound to the queue.
name
Alphanumeric
Yes
Name of exchange bound to the queue.
type
Alphanumeric. Possible values: "direct", "fanout", "headers" or "topic"
Yes
Type of exchange bound to the queue.
durable
Boolean
No
Exchange persistence. Default value: "false".
autoDelete
Boolean
No
Exchange lifespan. Default value: "false".
declarePassive
Boolean
No
Uses an existing exchange in the topology or creates a new exchange. Default value: "false".
exchangeArgs
Object
No
Additional information about the exchange behavior
alternateExchange
Alphanumeric
No
Name of an alternative exchange used when the exchange is unable to route a message.
Alphanumeric. Possible values: "CA", "FULL" or "NONE"
No
Peer verification mode. Taken into account if sslVerifyPeer is true. Default value: "FULL".
host
Non-empty alphanumeric
Yes
Redis server host name.
port
Integer between 1 and 65535
No
Number of TCP port used to connect to the server. Default value: 6379.
clientOptions
Object
No
Options for checking the behavior of the Redis client.
autoReconnect
Boolean
No
Enables automatic reconnection if the connection is lost. Default value: "true".
reconnectCloseTimeout
Positive integer
No
Successful reconnection attempt wait time before definitively closing the subscriber. Taken into account if autoReconnect is "true". Default value: "no wait time".
protocolVersion
Alphanumeric. Possible values: "RESP2" or "RESP3"
No
Protocol version to use to connect. Default value: "RESP3".
commandsTimeout
Alphanumeric in ISO-8601 duration format
No
Maximum wait time for a server command response. Default value: "no wait time".
pingBeforeActivateConnection
Boolean
No
Defines the sending of a PING before activating the connection. Taken into account if protocolVersion is "RESP2". Default value: "true".
Alphanumeric. Possible values: "byteArray", "utf8String", "asciiString" or "string(<encoding>)"
No
Type of message payload and channels/patterns to subscribe to.
Values "utf8String", "asciiString" and "string(<encoding>)" indicate that the payload (as well as the channels/patterns) is alphanumeric.
The "string(<encoding>)" value defines specific character encoding. This encoding value must be included in the character set registry defined by the IANA. It must also be included in the character sets supported by the JVM.
The "byteArray" value indicates that the payload (as well as the channels/patterns) is a byte array. In this case, the values associated with the channels/patterns properties (described below) must be Base64-encoded alphanumeric strings (byte array expressed in alphanumeric format).
This file in YAML format (UTF-8 encoding) is used to statically describe all the subscribers to start when the AMBSS is started. Also, when creating new subscribers via the REST APIs, their configuration can be saved in the subscribersConf.yml file.
The structure of this file is an object with two properties (or sections):
configurations: List of factored configurations
subscribers: Subscriber list
subscribersConf.yml vide
configurations:
-
subscribers:
-
subscribersConf.yml containing the Adelia RabbitMQ AMQP subscriber
When writing subscribers, identical information blocks may need to be duplicated (for example, the brokerConnection object in the case of subscribers connected to the same broker or the subscribeTopic object in the case of subscribers subscribed to the same topic).
This descriptive redundancy can be avoided by putting these configuration elements in the configurations section of the subscribersConf.yml file and declaring, in the subscribers section, the subscribers which will no longer contain the redundant configuration elements but a reference to these in the configurations section.
This groups together a list of configuration objects: each object is identified by a unique identifier and contains a subscriber's config object properties:
Name
Type
Mandatory
Description
id
Alphanumeric
No
Alphanumeric identifier of the configuration object
adeliaPoolName
Alphanumeric
No
Name of the Adelia pool used for DB connections
adeliaPgm
Object
No
Information relating to the associated EADELIA program
objectFileName
Alphanumeric
Yes
Name of the object file produced by the 3GL generation of the associated EADELIA program
javaPackage
Alphanumeric
No
Name of the Java package defined for the 3GL generation of the associated EADELIA program
parameters
List of alphanumerics
No
List of alphanumeric parameters passed to the associated EADELIA program during execution
customObjectMapper
Alphanumeric
No
Java Bean identifier defined in the beans-context.xml file, used to configure the properties to convert a JSON object to an Adelia class instance
brokerConfig
Object
No
Information relating to the native subscriber
factory
Object
No
Information relating to the Java factory responsible for creating the Adelia subscriber
id
Alphanumeric
Yes
Full name of the Java factory class
parameters
Object
No
Hash table (<alphanumeric key, value> pairs used as parameters when creating the factory)
brokerConnection
Object
No
Specific configuration of parameters for connecting the native subscriber to the target broker
subscribeTopic
Object
No
Specific configuration of parameters for subscribing the native subscriber to the target broker
A configuration object may be incomplete and not enter all the adeliaPoolName, adeliaPgm and brokerConfig properties
In the example, the configurations section has three configuration objects: the first (fullConfig) describes a full configuration while the other three (partialConfigAdeliaPgm, partialConfigBrokerConfig and partialConfigBrokerConnection) describe incomplete configurations.
Subscribers section
As seen previously, this section contains all the Adelia subscriber descriptions: each subscriber description must be complete (each property described as mandatory must be present).
However, a subscriber can be described by referring to a configuration object in the configurations section in its config object. In this case, the subscriber description is deduced by replacing all the references used by the referenced objects: the result description must be complete in order to be a valid Adelia subscriber description.
Reference syntax
The syntax of a reference to a configuration object declared in the configurations section is as follows:
'/configurations/<id of the configuration object>'
Examples:
'/configurations/fullConfig'
'/configurations/partialConfigAdeliaPgm'
'/configurations/partialConfigBrokerConfig'
Using a reference
In a subscriber description, one or more references are used in the config object.
Config value
When the config object has a reference as a value, all the properties of the referenced configuration object (apart from the "id" property) become the config object properties
To use several references, they need to be placed in a list. In this case, the config object is the result of the merger of all the properties of the referenced configuration objects (except the "id" properties) starting with index reference 1 in the list, up to n.
If the referenced configuration objects describe the same properties, the values of the properties of index reference n + 1 take priority over those of index reference n in the merge process.
References and properties of this object can be used in the config object. To do this, the reference (or list of references) is set via the "$ref" property. During the merge process, the "direct" properties of the config object take priority over the referenced configuration object properties.
When describing a subscriber, certain values associated with properties relating to sensitive data can be encrypted (like usernames or passwords). The encryption process uses an RSA256 asymmetric encryption algorithm based on the keys contained in the RSJwtSecurity.key keystore (which is also used as default to encrypt/decrypt the JWT tokens used for AMBSS authentication when security is enabled).
To indicate that a property value is encrypted, it (a Base64-encoded alphanumeric value) needs to be prefixed with the "{RSA}" string.
Example:
Example of Adelia RabbitMQ AMQP subscriber in YAML
Note: Alphanumeric strings can be configured with a prefix and suffix to be added to the encrypted values using the following two properties, defined in the application.yml and application-dev.yml configuration files:
An alphanumeric value is encrypted using the following instruction:
java -cp "%ADELIWS%javarun\*" com.hardis.jwtprovider.JwtKeyTool -encode -keyfile "<Path to AMBSS install dir>\extdConfig\RSJwtSecurity.key""<alphanumeric value to encrypt>"
The value of the RSJwtSecurity.key keystore delivered by default in the AMBSS can be changed using the following instruction:
java -cp "%ADELIWS%javarun\*" com.hardis.jwtprovider.JwtKeyTool -generate "<Path to AMBSS install dir>\extdConfig\RSJwtSecurity.key"
NB: If you have changed the value of the RSJwtSecurity.key keystore and are using the standalone authentication server %adeliws%/javarun/jwtProviderStandAlone/jwtProviderStandAlone.war with the default keystore (which is the same RSJwtSecurity.key file), you need to copy the newly created keystore file to the location defined in the META-INF\context.xml file of the installed jwtProviderStandAlone web application.
The AMBSS does not offer an Adelia subscriber supervision console.
However, there are several channels for monitoring subscriber execution.
Via the broker
Some brokers provide tools for monitoring the state of consumers.
For example, RabbitMQ offers a monitoring web console via the installation of an extension (RabbitMQ Management plugin).
Via logs
The analysis of logs produced by the AMBSS runtime and native subscribers identifies the subscriber state (define the desired log levels in the application.yml file).
Via the EADELIA program
When an error is detected by the native subscriber, this is sent to the EADELIA program and the program is informed of it. Depending on the type of error, this may shut down the subscriber.
Via REST APIs
Of the proposed APIs, one is used to read the description and state of a subscriber (started, stopped, awaiting a message, handling a message, with errors).