Kafka Java Client for Oracle Transactional Event Queues hands-on

Preamble

Kafka Java Client for Oracle Transactional Event Queues is the Oracle answer to transparently replace your Java Kafka application producing and consuming event from/to a Kafka cluster. And so replacing your Kafka cluster by an Oracle database…

First question I have asked myself: why the hell would I replace my Kafka cluster by an Oracle database ? Firstly, inside an Oracle database what will replace (or complement) a Kafka cluster is Transactional Event Queues (TxEventQ) (http).

This is explained in Apache Kafka versus Oracle Transactional Event Queues (TEQ) as Microservices Event Mesh blog post and the key arguments are roughly:

  • Oracle database now support all modern datatype: JSON, XML, graph, spacial, vector,…
  • Oracle database has a huge ecosystem of supported languages to access it: Python, Typescript, …
  • All High availability and manageability features coming with Oracle database
  • Direct access to the data by other Oracle based applications

But obviously at the end you have to find your own benefit(s) to go with TxEventQ and Kafka Oracle client…

At the time of writing this blog post the Oracle official documentation is really misleading because in 21c Transactional Event Queues and Advanced Queuing User’s Guide documentation the chapter Kafka Java Client Interface for Oracle Transactional Event Queues give you the feeling that you can use this database release. But in Oracle Kafka GitHub official page it is clearly specified that this client works only with Oracle Database 23ai Free version as well as Oracle Database 23ai available on Oracle Autonomous Cloud platform.

And in any case the DBMS_AQADM.CREATE_DATABASE_KAFKA_TOPIC procedure does not exist in 21c…

So I have used Podman 5.3.0, Oracle 23ai free (23.5) container and IntelliJ IDEA 2024.3 (Community Edition) to test the feature.

Podman quick setup

I am not entering in a complete Podman setup as this is clearly not the scope of this article. Following tons of (usual) problem with my company proxy I have finally initialized the Podman machine with –user-mode-networking option to route traffic through a host user-space process and benefit from my Windows laptop proxy configuration:

PS C:\> podman machine init --user-mode-networking
Looking up Podman Machine image at quay.io/podman/machine-os-wsl:5.3 to create VM
Getting image source signatures
Copying blob 70404e8d767d done   |
Copying config 44136fa355 done   |
Writing manifest to image destination
70404e8d767d50e97fc895a6cdf962922662c309b2afe682bbdf6982ce02d550
Extracting compressed file: podman-machine-default-amd64: done
Importing operating system into WSL (this may take a few minutes on a new WSL install)...
Import in progress, this may take a few minutes.
The operation completed successfully.
Installing user-mode networking distribution...
Import in progress, this may take a few minutes.
The operation completed successfully.
Configuring system...
Machine init complete
To start your machine run:
 
        podman machine start

My first Podman pull failed for an (again usual) certificate error (my company proxy is replacing web sites certificates by its own certificates). To solve it I would need to copy my ZScaler proxy certificates into Podman or use a trick:

PS C:\> podman pull container-registry.oracle.com/database/free:latest
Trying to pull container-registry.oracle.com/database/free:latest...
Error: initializing source docker://container-registry.oracle.com/database/free:latest: pinging container registry container-registry.oracle.com: Get "https://container-registry.oracle.com/v2/": tls: failed to verify certificate: x509: certificate signed by unknown authority

I have tried to put my ZScaler certificates in C:\Users\\.config\containers\certs.d but no luck… So:

PS C:\> podman pull --tls-verify=false container-registry.oracle.com/database/free:latest
Trying to pull container-registry.oracle.com/database/free:latest...
Getting image source signatures
Copying blob sha256:ecb4fe8cffd3eb8e7349984bad653ddfdf48cd41912cf122b4fff7ef87b030c9
Copying blob sha256:bd48d3f50dd7a3c0d666b758841cbad5de9471a7241b75492c0da564e8bc5aa4
Copying blob sha256:30950d539508065e4579e853745173a482835e9fdcbc7289cf7cbd651b8d0ddb
Copying blob sha256:52be51345d28fde108b3348b9cc0030c668da4cba4e3d57ce2bbab37aa6e2746
Copying blob sha256:c6905a973195abaadaa3b450e399df761e7bcbaaf4646d0e30807e7948a7475c
Copying blob sha256:3776f597231403d968231816a6dae9a5d1b5ae2321cbd330e06dfd46cb575a3b
Copying blob sha256:b65c5eddd16b6a3592eb8784ccc8b23bb064269ccf938554608a09f5a89edd34
Copying blob sha256:a46ca36d2998cca56fdbae2161af59fb0155bd6d32f32c72d782ec187e0b5bc0
Copying blob sha256:28c76d32849a75ee2e5f342694a8c2e62b2701204ed3df4c5489658be0e6ded5
Copying blob sha256:af2c542e5953b7919ce90814c6c91edee4abf6bad18523fcbd4e1f5461226ca3
Copying blob sha256:85b3ebaf2b5715dbcb0930c30b58c842e6b8ca14210fd482e3981d5b0808fb7a
Copying blob sha256:60ae4f7af338132b29218f431465cf759ffc0beb9999997f2e2fb1f0e03628a4
Copying blob sha256:887e445c4c0634434337c4c5eee5a0ab45cb239bd33b8cfb058ebed8750e3557
Copying blob sha256:c4cee0c7a6c25fefeead84d26068cbdd4f6ec1a335ae422fbdf9aaf863af6da9
Copying blob sha256:c248d1418febceee657af4a4290aebd6b992bcc842740765cbb3b0e46b4e204b
Copying blob sha256:644079d386d8e8452e275e6cd20bfe4da0221a64f1665b17164cb22d4f3bd7fa
Copying blob sha256:df8f4eb20bb1e711b3dd42d4769d70932c39839364658519788a1af799902745
Copying blob sha256:9ec103b6e2752a3992fac4c5342a9b3ff798eb5b87f707ff55d48ddd07754ff9
Copying blob sha256:f6a8af0be82f807ef73999571d7c5bf25722307c43d5bb1d637edec1024f0fe8
Copying blob sha256:92f086a2c5c6daee85e9e14eeca6e2c5ed4d929e74b983d5540b7a20098c7a52
Copying blob sha256:6bb22823d4eebc727ec772a5ed230f21eb063dd6dbbbfdb37bf71886293f1e78
Copying config sha256:e1ff84cf03edd29bdc63d9a7d868168c262c95c9039778598db14e4bce15fa53
Writing manifest to image destination
e1ff84cf03edd29bdc63d9a7d868168c262c95c9039778598db14e4bce15fa53
PS C:\> podman images
REPOSITORY                                   TAG         IMAGE ID      CREATED       SIZE
container-registry.oracle.com/database/free  latest      e1ff84cf03ed  2 months ago  9.48 GB
PS C:\> podman run --name ora23ai -p 1551:1521 --detach container-registry.oracle.com/database/free:latest
e4be1325622d7e62f5a9dbc93e0d9f42dd8e09acdb9c74ff5a230d0e4d94ecf2
PS C:\> podman ps --all
CONTAINER ID  IMAGE                                               COMMAND               CREATED         STATUS                    PORTS                             NAMES
e4be1325622d  container-registry.oracle.com/database/free:latest  /bin/bash -c $ORA...  17 seconds ago  Up 18 seconds (starting)  0.0.0.0:1551->1521/tcp, 1521/tcp  ora23ai

As I have not used the -e ORACLE_PWD parameter when creating the container I must change SYS, SYSTEM and PDB_ADMIN password with:

PS C:\> podman exec e4be1325622d ./setPassword.sh secure_password
The Oracle base remains unchanged with value /opt/oracle
 
SQL*Plus: Release 23.0.0.0.0 - for Oracle Cloud and Engineered Systems on Thu Nov 7 13:51:16 2024
Version 23.5.0.24.07
 
Copyright (c) 1982, 2024, Oracle.  All rights reserved.
 
 
Connected to:
Oracle Database 23ai Free Release 23.0.0.0.0 - Develop, Learn, and Run for Free
Version 23.5.0.24.07
 
SQL>
User altered.
 
SQL>
User altered.
 
SQL>
Session altered.
 
SQL>
User altered.
 
SQL> Disconnected from Oracle Database 23ai Free Release 23.0.0.0.0 - Develop, Learn, and Run for Free
Version 23.5.0.24.07

Either you use SQL Developer to connect or you can test it first with something like:

PS C:\> podman EXEC -it e4be1325622d sqlplus pdbadmin/secure_password@FREEPDB1
 
SQL*Plus: RELEASE 23.0.0.0.0 - FOR Oracle Cloud AND Engineered Systems ON Thu Nov 7 13:54:21 2024
Version 23.5.0.24.07
 
Copyright (c) 1982, 2024, Oracle.  ALL rights reserved.
 
 
Connected TO:
Oracle DATABASE 23ai Free RELEASE 23.0.0.0.0 - Develop, Learn, AND Run FOR Free
Version 23.5.0.24.07
 
SQL> SELECT banner FROM v$version;
 
BANNER
--------------------------------------------------------------------------------
Oracle DATABASE 23ai Free RELEASE 23.0.0.0.0 - Develop, Learn, AND Run FOR Free

If you want to connect to the OS of your container use:

PS C:\> podman exec -it e4be1325622d /bin/bash
bash-4.4$

It obviously also works very well with SQLcl (or SQL Developer):

PS C:\Program Files\sqlcl\bin> ./SQL.exe SYSTEM/"secure_password"@//localhost:1551/freepdb1
Picked up JAVA_TOOL_OPTIONS: -Duser.language=en
 
 
SQLcl: RELEASE 24.2 Production ON Thu Nov 07 15:01:25 2024
 
Copyright (c) 1982, 2024, Oracle.  ALL rights reserved.
 
LAST SUCCESSFUL login TIME: Thu Nov 07 2024 15:01:28 +01:00
 
Connected TO:
Oracle DATABASE 23ai Free RELEASE 23.0.0.0.0 - Develop, Learn, AND Run FOR Free
Version 23.5.0.24.07
 
SQL> SELECT banner FROM v$version;
 
BANNER
__________________________________________________________________________________
Oracle DATABASE 23ai Free RELEASE 23.0.0.0.0 - Develop, Learn, AND Run FOR Free

Kafka Java Client for Oracle Transactional Event Queues setup

Create an OKafka topic with:

SQL> EXEC dbms_aqadm.create_database_kafka_topic(topicname => 'okafka01');
 
PL/SQL PROCEDURE successfully completed.

Remark:
This type of queue is started by default. Even if in the background what’s created is a TxEventQ…

It creates the usual Oracle tables:

SQL> SELECT * FROM okafka01 ORDER BY enqueue_time DESC;
 
no ROWS selected
SQL> SELECT * FROM aq$okafka01 ORDER BY enq_timestamp DESC;
 
no ROWS selected

Kafka Java Client for Oracle Transactional Event Queues testing

I have started with okafka 23.4 as it is the current version on the GitHub official repository but faced this error when creating the producer:

Exception in thread "main" java.lang.NoSuchFieldError: Class org.apache.kafka.clients.ClientDnsLookup does not have member field 'org.apache.kafka.clients.ClientDnsLookup DEFAULT'

On MVN Repository I noticed a 23.5 version that solved above issue…

Taking the latest Kafka Client 3.9.0 I had this error (same as with Kafka Client 3.8.1 or 3.8.0):

Exception in thread "main" java.lang.NoSuchMethodError: 'org.apache.kafka.common.record.MemoryRecordsBuilder org.apache.kafka.common.record.MemoryRecords.builder(java.nio.ByteBuffer, byte, org.apache.kafka.common.record.CompressionType, org.apache.kafka.common.record.TimestampType, long)'

But then, saw in OKafka official GitHub, OKafka has a strong dependency on Kafka Client 3.7.1, using the correct jar solved the above issue…

To make it working you must also setup Simple Logging Facade for Java (SLF4J) API with a logging framework. I have decided to use Simple logger.

You also need to refer to OKafka GitHub page to have the list of dependencies. At the end what I have added to my project:

okafka01
okafka01

My simple Java code is inserting a JSON message with an unique key. I have a pause between the producer and consumer steps to allow you to select into the database:

package okafka01;
 
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecords;
 
import org.oracle.okafka.clients.producer.KafkaProducer;
import org.oracle.okafka.clients.consumer.KafkaConsumer;
 
import java.time.Duration;
import java.time.Instant;
import java.util.Properties;
import java.io.InputStream;
import java.io.FileNotFoundException;
import java.util.concurrent.Future;
import java.util.Random;
import pl.allegro.finance.tradukisto.ValueConverters;
import java.util.Arrays;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
public class okafka01 {
  public static void main(String[] args) {
    InputStream inputStream;
    Properties props = new Properties();
    Random rand = new Random();
    ValueConverters intConverter = ValueConverters.ENGLISH_INTEGER;
 
    //System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "debug");
    System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "error");
    Logger logger = LoggerFactory.getLogger(okafka01.class);
    // JDK14 logging framework
    /*LogManager logManager = LogManager.getLogManager();
    try {
        logManager.readConfiguration( new FileInputStream("logging.properties") );
      } catch ( IOException exception ) {
        logger.error( "Cannot read configuration file", exception );
      }*/
 
    try {
      String propFileName = "config.properties";
      inputStream = okafka01.class.getClassLoader().getResourceAsStream(propFileName);
      if (inputStream != null) {
        props.load(inputStream);
        inputStream.close();
      } else {
        throw new FileNotFoundException("Property file '" + propFileName + "' not found.");
      }
    } catch (Exception e) {
      System.out.println("Exception: " + e);
      logger.error("", e);
    }
 
    String topic = props.getProperty("topic.name", "OKAFKA01");
    int i = rand.nextInt(1000)+1;
    props.remove("topic.name");
    String baseMsg = "{\"id\":" + i + ",\"descr\":\"" + intConverter.asWords(i) + "\"}";
 
    // ***************************************************************************************************************
    // Creates OKafka Producer
    Producer<String, String> producer = new KafkaProducer<>(props);
    Instant startTime = Instant.now();
    //Produce one messages into topic named "TEQ".
    ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, "Key " + i, baseMsg);
    Future<RecordMetadata> lastFuture;
    try {
      lastFuture = producer.send(producerRecord);
      long runTime = Duration.between(startTime, Instant.now()).toMillis();
      System.out.println("Produced message with payload " + baseMsg + ". Run Duration " + runTime);
      RecordMetadata  rd = lastFuture.get();
      System.out.println("Last record placed in partition " + rd.partition() + " Offset " + rd.offset());
    }
    catch(Exception e) {
      System.out.println("Failed to send messages: " + e );
      logger.error("", e);
    }
    finally {
      //Closes the OKafka producer
      producer.close();
    }
 
    // ***************************************************************************************************************
    // Pause between produce and consumer to allow to check what happened at database level
    System.out.println("Press Enter to continue");
    try {
      System.in.read();
    }
    catch (Exception e) {
      logger.error("", e);
    }
 
    // ***************************************************************************************************************
    // Creates OKafka Consumer
    KafkaConsumer<String , String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList(topic));
 
    int retries = 1;
    try {
      while(retries <= 5) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
 
        for (ConsumerRecord<String, String> record : records)
          System.out.printf("partition = %d, offset = %d, key = %s, value = %s\n  ", record.partition(), record.offset(), record.key(), record.value());
 
        if (records.count() > 0) {
          System.out.println("Committing records" + records.count());
          consumer.commitSync();
        } else {
          System.out.println("No Record Fetched. Retrying in 1 second");
          Thread.sleep(1000);
        }
        retries++;
      }
    }
    catch(Exception e) {
      System.out.println("Exception from consumer " + e);
      logger.error("", e);
    }
    finally {
      consumer.close();
    }
  }
}

My config.properties file is:

#Properties to connect to Oracle Database
#Option 1: Connect to Oracle database using plaintext
bootstrap.servers=127.0.0.1:1551
oracle.service.name=freepdb1
security.protocol=PLAINTEXT
oracle.net.tns_admin=.
 
#Option 2: Connect to Oracle Database deployed in Oracle Autonomous Cloud using Wallet
#security.protocol=SSL
#oracle.net.tns_admin=<location of Oracle Wallet, tnanames.ora and ojdbc.properties file>
#tns.alias=<tns alias>
 
#Appliction specific OKafka Producer properties
topic.name=OKAFKA01
 
allow.auto.create.topics=false
group.id=SUBSCRIBER01
# Start consuming from the beginning (Default = latest);
auto.offset.reset=earliest
 
enable.idempotence=true
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

My ojdbc.properties file is:

user=yjaquier
password=secure_password

And finally (in the case of JDK14 logging framework if you decide to use it is) the logging.peoperties file:

# level : OFF / SEVERE / WARNING / INFO / CONFIG / FINE / FINER / FINEST / ALL
.level=ALL

The hierarchy where to put the different files:

okafka02
okafka02

The console of the execution:

okafka03
okafka03

From Oracle after the message has been produced. I have not been able to extract the content of userdata_blob column:

SQL> SELECT enqueue_time, correlation, TO_CHAR(userdata_blob), utl_raw.cast_to_varchar2(userdata_raw) AS userdata_raw FROM okafka01 ORDER BY enqueue_time DESC;
 
ENQUEUE_TIME                           CORRELATION    TO_CHAR(USERDATA_BLOB)    USERDATA_RAW
______________________________________ ______________ _________________________ ________________________________________________________________
20-NOV-24 01.49.35.344705000 PM GMT    KEY 385                                  KEY 385.{"id":385,"descr":"three hundred eighty-five"}
 
SQL> SELECT msg_id, msg_state, enq_timestamp, deq_timestamp, consumer_name FROM aq$okafka01 ORDER BY enq_timestamp DESC;
 
MSG_ID                              MSG_STATE    ENQ_TIMESTAMP                      DEQ_TIMESTAMP    CONSUMER_NAME
___________________________________ ____________ __________________________________ ________________ ________________
02000000000000000000000000660000    READY        20-NOV-24 02.49.35.344705000 PM                     SUBSCRIBER01

This is also automatically creating a subscriber:

SQL> SELECT * FROM dba_queue_subscribers WHERE owner='YJAQUIER';
 
OWNER       QUEUE_NAME    QUEUE_TABLE    CONSUMER_NAME    ADDRESS       PROTOCOL TRANSFORMATION    RULE    DELIVERY_MODE             IF_NONDURABLE_SUBSCRIBER    QUEUE_TO_QUEUE       SUBSCRIBER_ID    POS_BITMAP
___________ _____________ ______________ ________________ __________ ___________ _________________ _______ _________________________ ___________________________ _________________ ________________ _____________
YJAQUIER    OKAFKA01      OKAFKA01       SUBSCRIBER01                          0                           PERSISTENT_OR_BUFFERED    NO                          FALSE                            1             0

Once the message has been consumed:

SQL> SELECT msg_id, msg_state, enq_timestamp, deq_timestamp, consumer_name FROM aq$okafka01 ORDER BY enq_timestamp DESC;
 
MSG_ID                              MSG_STATE    ENQ_TIMESTAMP                      DEQ_TIMESTAMP                      CONSUMER_NAME
___________________________________ ____________ __________________________________ __________________________________ ________________
02000000000000000000000000660000    PROCESSED    20-NOV-24 02.49.35.344705000 PM    20-NOV-24 02.52.18.489923000 PM    SUBSCRIBER01

While testing if you want to purge all the existing message use something like:

DECLARE
  purge_options dbms_aqadm.aq$_purge_options_t;
BEGIN
  purge_options.block := TRUE;
  purge_options.delivery_mode := dbms_aq.persistent_or_buffered;
  dbms_aqadm.purge_queue_table(queue_table => 'okafka01', purge_condition => NULL, purge_options => purge_options);
END;
/

References

About Post Author

Share the knowledge!

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes:

<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>