Table of contents
Preamble
Kafka Java Client for Oracle Transactional Event Queues is the Oracle answer to transparently replace your Java Kafka application producing and consuming event from/to a Kafka cluster. And so replacing your Kafka cluster by an Oracle database…
First question I have asked myself: why the hell would I replace my Kafka cluster by an Oracle database ? Firstly, inside an Oracle database what will replace (or complement) a Kafka cluster is Transactional Event Queues (TxEventQ) (http).
This is explained in Apache Kafka versus Oracle Transactional Event Queues (TEQ) as Microservices Event Mesh blog post and the key arguments are roughly:
- Oracle database now support all modern datatype: JSON, XML, graph, spacial, vector,…
- Oracle database has a huge ecosystem of supported languages to access it: Python, Typescript, …
- All High availability and manageability features coming with Oracle database
- Direct access to the data by other Oracle based applications
But obviously at the end you have to find your own benefit(s) to go with TxEventQ and Kafka Oracle client…
At the time of writing this blog post the Oracle official documentation is really misleading because in 21c Transactional Event Queues and Advanced Queuing User’s Guide documentation the chapter Kafka Java Client Interface for Oracle Transactional Event Queues give you the feeling that you can use this database release. But in Oracle Kafka GitHub official page it is clearly specified that this client works only with Oracle Database 23ai Free version as well as Oracle Database 23ai available on Oracle Autonomous Cloud platform.
And in any case the DBMS_AQADM.CREATE_DATABASE_KAFKA_TOPIC procedure does not exist in 21c…
So I have used Podman 5.3.0, Oracle 23ai free (23.5) container and IntelliJ IDEA 2024.3 (Community Edition) to test the feature.
Podman quick setup
I am not entering in a complete Podman setup as this is clearly not the scope of this article. Following tons of (usual) problem with my company proxy I have finally initialized the Podman machine with –user-mode-networking option to route traffic through a host user-space process and benefit from my Windows laptop proxy configuration:
PS C:\> podman machine init --user-mode-networking Looking up Podman Machine image at quay.io/podman/machine-os-wsl:5.3 to create VM Getting image source signatures Copying blob 70404e8d767d done | Copying config 44136fa355 done | Writing manifest to image destination 70404e8d767d50e97fc895a6cdf962922662c309b2afe682bbdf6982ce02d550 Extracting compressed file: podman-machine-default-amd64: done Importing operating system into WSL (this may take a few minutes on a new WSL install)... Import in progress, this may take a few minutes. The operation completed successfully. Installing user-mode networking distribution... Import in progress, this may take a few minutes. The operation completed successfully. Configuring system... Machine init complete To start your machine run: podman machine start |
My first Podman pull failed for an (again usual) certificate error (my company proxy is replacing web sites certificates by its own certificates). To solve it I would need to copy my ZScaler proxy certificates into Podman or use a trick:
PS C:\> podman pull container-registry.oracle.com/database/free:latest Trying to pull container-registry.oracle.com/database/free:latest... Error: initializing source docker://container-registry.oracle.com/database/free:latest: pinging container registry container-registry.oracle.com: Get "https://container-registry.oracle.com/v2/": tls: failed to verify certificate: x509: certificate signed by unknown authority |
I have tried to put my ZScaler certificates in C:\Users\
PS C:\> podman pull --tls-verify=false container-registry.oracle.com/database/free:latest Trying to pull container-registry.oracle.com/database/free:latest... Getting image source signatures Copying blob sha256:ecb4fe8cffd3eb8e7349984bad653ddfdf48cd41912cf122b4fff7ef87b030c9 Copying blob sha256:bd48d3f50dd7a3c0d666b758841cbad5de9471a7241b75492c0da564e8bc5aa4 Copying blob sha256:30950d539508065e4579e853745173a482835e9fdcbc7289cf7cbd651b8d0ddb Copying blob sha256:52be51345d28fde108b3348b9cc0030c668da4cba4e3d57ce2bbab37aa6e2746 Copying blob sha256:c6905a973195abaadaa3b450e399df761e7bcbaaf4646d0e30807e7948a7475c Copying blob sha256:3776f597231403d968231816a6dae9a5d1b5ae2321cbd330e06dfd46cb575a3b Copying blob sha256:b65c5eddd16b6a3592eb8784ccc8b23bb064269ccf938554608a09f5a89edd34 Copying blob sha256:a46ca36d2998cca56fdbae2161af59fb0155bd6d32f32c72d782ec187e0b5bc0 Copying blob sha256:28c76d32849a75ee2e5f342694a8c2e62b2701204ed3df4c5489658be0e6ded5 Copying blob sha256:af2c542e5953b7919ce90814c6c91edee4abf6bad18523fcbd4e1f5461226ca3 Copying blob sha256:85b3ebaf2b5715dbcb0930c30b58c842e6b8ca14210fd482e3981d5b0808fb7a Copying blob sha256:60ae4f7af338132b29218f431465cf759ffc0beb9999997f2e2fb1f0e03628a4 Copying blob sha256:887e445c4c0634434337c4c5eee5a0ab45cb239bd33b8cfb058ebed8750e3557 Copying blob sha256:c4cee0c7a6c25fefeead84d26068cbdd4f6ec1a335ae422fbdf9aaf863af6da9 Copying blob sha256:c248d1418febceee657af4a4290aebd6b992bcc842740765cbb3b0e46b4e204b Copying blob sha256:644079d386d8e8452e275e6cd20bfe4da0221a64f1665b17164cb22d4f3bd7fa Copying blob sha256:df8f4eb20bb1e711b3dd42d4769d70932c39839364658519788a1af799902745 Copying blob sha256:9ec103b6e2752a3992fac4c5342a9b3ff798eb5b87f707ff55d48ddd07754ff9 Copying blob sha256:f6a8af0be82f807ef73999571d7c5bf25722307c43d5bb1d637edec1024f0fe8 Copying blob sha256:92f086a2c5c6daee85e9e14eeca6e2c5ed4d929e74b983d5540b7a20098c7a52 Copying blob sha256:6bb22823d4eebc727ec772a5ed230f21eb063dd6dbbbfdb37bf71886293f1e78 Copying config sha256:e1ff84cf03edd29bdc63d9a7d868168c262c95c9039778598db14e4bce15fa53 Writing manifest to image destination e1ff84cf03edd29bdc63d9a7d868168c262c95c9039778598db14e4bce15fa53 |
PS C:\> podman images REPOSITORY TAG IMAGE ID CREATED SIZE container-registry.oracle.com/database/free latest e1ff84cf03ed 2 months ago 9.48 GB |
PS C:\> podman run --name ora23ai -p 1551:1521 --detach container-registry.oracle.com/database/free:latest e4be1325622d7e62f5a9dbc93e0d9f42dd8e09acdb9c74ff5a230d0e4d94ecf2 |
PS C:\> podman ps --all CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES e4be1325622d container-registry.oracle.com/database/free:latest /bin/bash -c $ORA... 17 seconds ago Up 18 seconds (starting) 0.0.0.0:1551->1521/tcp, 1521/tcp ora23ai |
As I have not used the -e ORACLE_PWD parameter when creating the container I must change SYS, SYSTEM and PDB_ADMIN password with:
PS C:\> podman exec e4be1325622d ./setPassword.sh secure_password The Oracle base remains unchanged with value /opt/oracle SQL*Plus: Release 23.0.0.0.0 - for Oracle Cloud and Engineered Systems on Thu Nov 7 13:51:16 2024 Version 23.5.0.24.07 Copyright (c) 1982, 2024, Oracle. All rights reserved. Connected to: Oracle Database 23ai Free Release 23.0.0.0.0 - Develop, Learn, and Run for Free Version 23.5.0.24.07 SQL> User altered. SQL> User altered. SQL> Session altered. SQL> User altered. SQL> Disconnected from Oracle Database 23ai Free Release 23.0.0.0.0 - Develop, Learn, and Run for Free Version 23.5.0.24.07 |
Either you use SQL Developer to connect or you can test it first with something like:
PS C:\> podman EXEC -it e4be1325622d sqlplus pdbadmin/secure_password@FREEPDB1 SQL*Plus: RELEASE 23.0.0.0.0 - FOR Oracle Cloud AND Engineered Systems ON Thu Nov 7 13:54:21 2024 Version 23.5.0.24.07 Copyright (c) 1982, 2024, Oracle. ALL rights reserved. Connected TO: Oracle DATABASE 23ai Free RELEASE 23.0.0.0.0 - Develop, Learn, AND Run FOR Free Version 23.5.0.24.07 SQL> SELECT banner FROM v$version; BANNER -------------------------------------------------------------------------------- Oracle DATABASE 23ai Free RELEASE 23.0.0.0.0 - Develop, Learn, AND Run FOR Free |
If you want to connect to the OS of your container use:
PS C:\> podman exec -it e4be1325622d /bin/bash bash-4.4$ |
It obviously also works very well with SQLcl (or SQL Developer):
PS C:\Program Files\sqlcl\bin> ./SQL.exe SYSTEM/"secure_password"@//localhost:1551/freepdb1 Picked up JAVA_TOOL_OPTIONS: -Duser.language=en SQLcl: RELEASE 24.2 Production ON Thu Nov 07 15:01:25 2024 Copyright (c) 1982, 2024, Oracle. ALL rights reserved. LAST SUCCESSFUL login TIME: Thu Nov 07 2024 15:01:28 +01:00 Connected TO: Oracle DATABASE 23ai Free RELEASE 23.0.0.0.0 - Develop, Learn, AND Run FOR Free Version 23.5.0.24.07 SQL> SELECT banner FROM v$version; BANNER __________________________________________________________________________________ Oracle DATABASE 23ai Free RELEASE 23.0.0.0.0 - Develop, Learn, AND Run FOR Free |
Kafka Java Client for Oracle Transactional Event Queues setup
Create an OKafka topic with:
SQL> EXEC dbms_aqadm.create_database_kafka_topic(topicname => 'okafka01'); PL/SQL PROCEDURE successfully completed. |
Remark:
This type of queue is started by default. Even if in the background what’s created is a TxEventQ…
It creates the usual Oracle tables:
SQL> SELECT * FROM okafka01 ORDER BY enqueue_time DESC; no ROWS selected SQL> SELECT * FROM aq$okafka01 ORDER BY enq_timestamp DESC; no ROWS selected |
Kafka Java Client for Oracle Transactional Event Queues testing
I have started with okafka 23.4 as it is the current version on the GitHub official repository but faced this error when creating the producer:
Exception in thread "main" java.lang.NoSuchFieldError: Class org.apache.kafka.clients.ClientDnsLookup does not have member field 'org.apache.kafka.clients.ClientDnsLookup DEFAULT' |
On MVN Repository I noticed a 23.5 version that solved above issue…
Taking the latest Kafka Client 3.9.0 I had this error (same as with Kafka Client 3.8.1 or 3.8.0):
Exception in thread "main" java.lang.NoSuchMethodError: 'org.apache.kafka.common.record.MemoryRecordsBuilder org.apache.kafka.common.record.MemoryRecords.builder(java.nio.ByteBuffer, byte, org.apache.kafka.common.record.CompressionType, org.apache.kafka.common.record.TimestampType, long)' |
But then, saw in OKafka official GitHub, OKafka has a strong dependency on Kafka Client 3.7.1, using the correct jar solved the above issue…
To make it working you must also setup Simple Logging Facade for Java (SLF4J) API with a logging framework. I have decided to use Simple logger.
You also need to refer to OKafka GitHub page to have the list of dependencies. At the end what I have added to my project:

My simple Java code is inserting a JSON message with an unique key. I have a pause between the producer and consumer steps to allow you to select into the database:
package okafka01; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.oracle.okafka.clients.producer.KafkaProducer; import org.oracle.okafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.time.Instant; import java.util.Properties; import java.io.InputStream; import java.io.FileNotFoundException; import java.util.concurrent.Future; import java.util.Random; import pl.allegro.finance.tradukisto.ValueConverters; import java.util.Arrays; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class okafka01 { public static void main(String[] args) { InputStream inputStream; Properties props = new Properties(); Random rand = new Random(); ValueConverters intConverter = ValueConverters.ENGLISH_INTEGER; //System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "debug"); System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "error"); Logger logger = LoggerFactory.getLogger(okafka01.class); // JDK14 logging framework /*LogManager logManager = LogManager.getLogManager(); try { logManager.readConfiguration( new FileInputStream("logging.properties") ); } catch ( IOException exception ) { logger.error( "Cannot read configuration file", exception ); }*/ try { String propFileName = "config.properties"; inputStream = okafka01.class.getClassLoader().getResourceAsStream(propFileName); if (inputStream != null) { props.load(inputStream); inputStream.close(); } else { throw new FileNotFoundException("Property file '" + propFileName + "' not found."); } } catch (Exception e) { System.out.println("Exception: " + e); logger.error("", e); } String topic = props.getProperty("topic.name", "OKAFKA01"); int i = rand.nextInt(1000)+1; props.remove("topic.name"); String baseMsg = "{\"id\":" + i + ",\"descr\":\"" + intConverter.asWords(i) + "\"}"; // *************************************************************************************************************** // Creates OKafka Producer Producer<String, String> producer = new KafkaProducer<>(props); Instant startTime = Instant.now(); //Produce one messages into topic named "TEQ". ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, "Key " + i, baseMsg); Future<RecordMetadata> lastFuture; try { lastFuture = producer.send(producerRecord); long runTime = Duration.between(startTime, Instant.now()).toMillis(); System.out.println("Produced message with payload " + baseMsg + ". Run Duration " + runTime); RecordMetadata rd = lastFuture.get(); System.out.println("Last record placed in partition " + rd.partition() + " Offset " + rd.offset()); } catch(Exception e) { System.out.println("Failed to send messages: " + e ); logger.error("", e); } finally { //Closes the OKafka producer producer.close(); } // *************************************************************************************************************** // Pause between produce and consumer to allow to check what happened at database level System.out.println("Press Enter to continue"); try { System.in.read(); } catch (Exception e) { logger.error("", e); } // *************************************************************************************************************** // Creates OKafka Consumer KafkaConsumer<String , String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); int retries = 1; try { while(retries <= 5) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000)); for (ConsumerRecord<String, String> record : records) System.out.printf("partition = %d, offset = %d, key = %s, value = %s\n ", record.partition(), record.offset(), record.key(), record.value()); if (records.count() > 0) { System.out.println("Committing records" + records.count()); consumer.commitSync(); } else { System.out.println("No Record Fetched. Retrying in 1 second"); Thread.sleep(1000); } retries++; } } catch(Exception e) { System.out.println("Exception from consumer " + e); logger.error("", e); } finally { consumer.close(); } } } |
My config.properties file is:
#Properties to connect to Oracle Database #Option 1: Connect to Oracle database using plaintext bootstrap.servers=127.0.0.1:1551 oracle.service.name=freepdb1 security.protocol=PLAINTEXT oracle.net.tns_admin=. #Option 2: Connect to Oracle Database deployed in Oracle Autonomous Cloud using Wallet #security.protocol=SSL #oracle.net.tns_admin=<location of Oracle Wallet, tnanames.ora and ojdbc.properties file> #tns.alias=<tns alias> #Appliction specific OKafka Producer properties topic.name=OKAFKA01 allow.auto.create.topics=false group.id=SUBSCRIBER01 # Start consuming from the beginning (Default = latest); auto.offset.reset=earliest enable.idempotence=true key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer |
My ojdbc.properties file is:
user=yjaquier password=secure_password |
And finally (in the case of JDK14 logging framework if you decide to use it is) the logging.peoperties file:
# level : OFF / SEVERE / WARNING / INFO / CONFIG / FINE / FINER / FINEST / ALL
.level=ALL |
The hierarchy where to put the different files:

The console of the execution:

From Oracle after the message has been produced. I have not been able to extract the content of userdata_blob column:
SQL> SELECT enqueue_time, correlation, TO_CHAR(userdata_blob), utl_raw.cast_to_varchar2(userdata_raw) AS userdata_raw FROM okafka01 ORDER BY enqueue_time DESC; ENQUEUE_TIME CORRELATION TO_CHAR(USERDATA_BLOB) USERDATA_RAW ______________________________________ ______________ _________________________ ________________________________________________________________ 20-NOV-24 01.49.35.344705000 PM GMT KEY 385 KEY 385.{"id":385,"descr":"three hundred eighty-five"} SQL> SELECT msg_id, msg_state, enq_timestamp, deq_timestamp, consumer_name FROM aq$okafka01 ORDER BY enq_timestamp DESC; MSG_ID MSG_STATE ENQ_TIMESTAMP DEQ_TIMESTAMP CONSUMER_NAME ___________________________________ ____________ __________________________________ ________________ ________________ 02000000000000000000000000660000 READY 20-NOV-24 02.49.35.344705000 PM SUBSCRIBER01 |
This is also automatically creating a subscriber:
SQL> SELECT * FROM dba_queue_subscribers WHERE owner='YJAQUIER'; OWNER QUEUE_NAME QUEUE_TABLE CONSUMER_NAME ADDRESS PROTOCOL TRANSFORMATION RULE DELIVERY_MODE IF_NONDURABLE_SUBSCRIBER QUEUE_TO_QUEUE SUBSCRIBER_ID POS_BITMAP ___________ _____________ ______________ ________________ __________ ___________ _________________ _______ _________________________ ___________________________ _________________ ________________ _____________ YJAQUIER OKAFKA01 OKAFKA01 SUBSCRIBER01 0 PERSISTENT_OR_BUFFERED NO FALSE 1 0 |
Once the message has been consumed:
SQL> SELECT msg_id, msg_state, enq_timestamp, deq_timestamp, consumer_name FROM aq$okafka01 ORDER BY enq_timestamp DESC; MSG_ID MSG_STATE ENQ_TIMESTAMP DEQ_TIMESTAMP CONSUMER_NAME ___________________________________ ____________ __________________________________ __________________________________ ________________ 02000000000000000000000000660000 PROCESSED 20-NOV-24 02.49.35.344705000 PM 20-NOV-24 02.52.18.489923000 PM SUBSCRIBER01 |
While testing if you want to purge all the existing message use something like:
DECLARE purge_options dbms_aqadm.aq$_purge_options_t; BEGIN purge_options.block := TRUE; purge_options.delivery_mode := dbms_aq.persistent_or_buffered; dbms_aqadm.purge_queue_table(queue_table => 'okafka01', purge_condition => NULL, purge_options => purge_options); END; / |
References
- Kafka APIs for Oracle Transactional Event Queues
- Kafka Client for Oracle Transactional Event Queues Quickstart Guide
- Kafka Java Client for Oracle Transactional Event Queues
- Replacing Kafka use cases with powerful in-built DB queues in modern applications by Sanjay Goil
- Oracle Database Free
- Welcome to the Oracle Container Registry
- Docker Images from Oracle
- How to install Linux on Windows with WSL
- Podman for Windows