Table of contents
Preamble
Transactional Event Queues (TxEventQ) and Advanced Queuing (AQ) are queuing systems integrated with Oracle database. The two features are included for free in all Oracle database editions, TxEventQ also include a limited licence of Oracle database partitioning. Those queueing systems are nowadays extremely popular with web, mobile, IoT, and other data-driven/event-driven applications. Producers enqueue messages and consumers dequeue messages. Apache Kafka solution is one of the most famous around this subject…
From what I have read TxEventQ is an evolution of AQ and is the way to go:
Transactional Event Queues (TxEventQ) is an highly optimized implementation of AQ previously called AQ Sharded Queues
TxEventQ is also the only solution that can be interfaced with the Oracle Kafka Java client but this is out-of-scope of this blog post and we will see this in another one…
Oracle recommandation on which technology to use is:
Message type | Old Name | New Name |
---|---|---|
Persistent messages | AQ classic queues | AQ queues |
Persistent messages | AQ Sharded queues | TxEventQ queues |
Buffered messages | AQ classic queues | AQ buffered queues |
Versus an Apache Kafka architecture TxEventQ leverage all the features of an Oracle database like high availability, scalability, reliability, being stored in SQL accessible objects…
The big picture inspired by Oracle documentation is (you can transform messages while enqueueing and dequeuing, check DBA_TRANSFORMATIONS view and DBMS_TRANSFORM package for how to do):

I have tested TxEventQ and AQ with Oracle Enterprise edition 21c (21.16) running on a 12 cores and 64GB of memory with Red Hat Enterprise Linux release 8.7 (Ootpa) server.
From what I have seen even in the latest available release at the time of writing this post I rate the feature not mature to be used in production and still lack the basics of what was working perfectly with AQ !
Transactional Event Queues (TxEventQ) creation
Create the TxEventQ with:
DECLARE queue_properties dbms_aqadm.queue_props_t; BEGIN queue_properties.retention_time := 10; -- For testing purpose, delete message 10 seconds after they have ben dequeued dbms_aqadm.create_transactional_event_queue(queue_name => 'txeventq01', multiple_consumers => TRUE, COMMENT => 'My TxEventQ', queue_payload_type => 'json', queue_properties => queue_properties); dbms_aqadm.start_queue(queue_name => 'txeventq01'); END; / PL/SQL PROCEDURE successfully completed. |
I had many issues when I forget, so add a (or more) subscriber(s) to your TxEventQ queue with:
DECLARE subscriber01 sys.aq$_agent; BEGIN subscriber01 := sys.aq$_agent(name => 'subscriber01', address => NULL, protocol => 0); dbms_aqadm.add_subscriber(queue_name => 'txeventq01', subscriber => subscriber01); END; / PL/SQL PROCEDURE successfully completed. |
You can use DBA views to check all has been done:
SQL> SELECT * FROM dba_queues WHERE owner='YJAQUIER'; OWNER NAME QUEUE_TABLE QID QUEUE_TYPE MAX_RETRIES RETRY_DELAY ENQUEUE_ENABLED DEQUEUE_ENABLED RETENTION USER_COMMENT NETWORK_NAME SHARDED QUEUE_CATEGORY RECIPIENTS ___________ _____________ ______________ ________ _______________ ______________ ______________ __________________ __________________ ____________ _______________ _______________ __________ ____________________________ _____________ YJAQUIER TXEVENTQ01 TXEVENTQ01 77708 NORMAL_QUEUE 5 0 YES YES 10 My TxEventQ TRUE Transactional Event Queue MULTIPLE SQL> SELECT * FROM dba_queue_subscribers WHERE owner='YJAQUIER'; OWNER QUEUE_NAME QUEUE_TABLE CONSUMER_NAME ADDRESS PROTOCOL TRANSFORMATION RULE DELIVERY_MODE IF_NONDURABLE_SUBSCRIBER QUEUE_TO_QUEUE SUBSCRIBER_ID POS_BITMAP ___________ _____________ ______________ ________________ __________ ___________ _________________ _______________________________ ________________ ___________________________ _________________ ________________ _____________ YJAQUIER TXEVENTQ01 TXEVENTQ01 SUBSCRIBER01 0 PERSISTENT NO FALSE 1 0 |
When testing you can clean everything with:
BEGIN dbms_aqadm.stop_queue(queue_name => 'txeventq01'); dbms_aqadm.drop_transactional_event_queue(queue_name => 'txeventq01'); END; / PL/SQL PROCEDURE successfully completed. |
You can also just flush the queue content with:
DECLARE purge_options dbms_aqadm.aq$_purge_options_t; BEGIN purge_options.block := FALSE; purge_options.delivery_mode := dbms_aq.persistent; dbms_aqadm.purge_queue_table(queue_table => 'txeventq01', purge_condition => NULL, purge_options => purge_options); END; / |
Enqueue/dequeue testing with PL/SQL
Enqueue a message
SET serveroutput ON SIZE unlimited DECLARE enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; recipients dbms_aq.aq$_recipient_list_t; message_handle RAW(16); message json; BEGIN -- create the message payload message:=json('{"id": 1, "descr": "Subscriber01"}'); -- set the consumer name --message_properties.correlation := 'subscriber01'; --message_properties.recipient_list.extend; recipients(1) := sys.aq$_agent(name => 'subscriber01', address => NULL, protocol => 0); -- You can add multiple --recipients(2) := sys.aq$_agent(name => 'subscriber02', address => null, protocol => 0); -- You can add multiple message_properties.recipient_list := recipients; -- enqueue the message and display returning the message_id dbms_aq.enqueue(queue_name => 'txeventq01', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); dbms_output.put_line('Message enqueued with message_id: ' || message_handle); -- commit the transaction COMMIT; END; / Message enqueued WITH message_id: 00000000000000000200000004660000 PL/SQL PROCEDURE successfully completed. |
Remark:
I have kept in comment the multiple tries I have done. Trying to insert with DBMS_QA.ENQUEUE a payload that is not of JMS type is either not working or directly failing with an ORA-000600 error message:
ERROR at line 1: ORA-00600: internal error code, arguments: [FILE:psdmsc.c LINE:1262 ID:OCIKCallPush], [], [], [], [], [], [], [], [], [], [], [] ORA-06512: at "SYS.DBMS_AQ", line 202 ORA-06512: at line 18 |
Dequeue a message
SET serveroutput ON SIZE unlimited DECLARE dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message json; BEGIN -- dequeue_mode determines whether we will consume the message or just browse it and leave it there dequeue_options.dequeue_mode := dbms_aq.remove; -- wait controls how long to wait for a message to arrive before giving up dequeue_options.wait := dbms_aq.no_wait; -- we must specify navigation so we know where to look in the TEQ dequeue_options.navigation := dbms_aq.first_message; -- set the consumer name dequeue_options.consumer_name := 'subscriber01'; -- perform the dequeue dbms_aq.dequeue(queue_name => 'txeventq01', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); -- print out the message payload dbms_output.put_line(json_serialize(message RETURNING VARCHAR2(3) TRUNCATE error ON error)); -- commit the transaction COMMIT; END; / {"id":1,"descr":"One"} PL/SQL PROCEDURE successfully completed. |
Few queries to check at database level what happened:
SELECT * FROM aq$txeventq01 ORDER BY enq_timestamp DESC; SELECT * FROM txeventq01 ORDER BY enqueue_time DESC; SELECT * FROM v$eq_message_cache; SELECT * FROM v$eq_subscriber_load; SELECT * FROM v$eq_dequeue_sessions; |
Enqueue/dequeue testing with Python
For Python virtual environments I use Miniconda and to be compliant with the new licensing model of Conda remove, first, the official repository and add the forge one with:
$ conda config --add channels conda-forge $ conda config --remove channels defaults |
And I have created a dedicated environment with Python 3.13 using:
conda create -n "oracle" python=3.13 |
Once in this environment install Python package oracledb with:
$ pip install oracledb |
When accessing the oracledb documentation you will see that advanced queuing can only be used with a thick client:
Oracle Advanced Queuing is only supported in the python-oracledb Thick mode. See Enabling python-oracledb Thick mode.
I have used Instant Client 23.5 that I installed in C:\Program Files\instantclient_23_5, so the code to use it…
Enqueue a message
import oracledb import os import platform from num2words import num2words import random d = None # On Linux, no directory should be passed if platform.system() == "Darwin": # macOS d = os.environ.get("HOME") + ("/Downloads/instantclient_23_3") elif platform.system() == "Windows": # Windows d = r"C:\Program Files\instantclient_23_5" oracledb.init_oracle_client(lib_dir=d) connection = oracledb.connect(dsn='euls20092.sgp.st.com:1531/pdb1',user='yjaquier',password='secure_password') # sql = "select user_data from yjaquier.txeventq01 order by enqueue_time desc" # with connection.cursor() as cursor: # print("Get all rows via an iterator") # for result in cursor.execute(sql): # print(result) queue = connection.queue("TXEVENTQ01", "JSON") # queue = connection.queue("AQ01", "JSON") i = random.randint(1,100) json_data = [dict(id = i, descr = num2words(i).capitalize())] for data in json_data: message_property = connection.msgproperties(recipients = ["subscriber01", "subscriber02"], payload = data) queue.enqone(message_property) print('Payload: ' + str(data)) print('Message enqueued with message_id: ' + message_property.msgid.hex()) connection.commit() |
When I execute:
$ python oracle_enqueue.py Payload: {'id': 89, 'descr': 'Eighty-nine'} Message enqueued with message_id: 2589152b4f8cdb5fe063402b4b0ad02d (oracle) |
Dequeue a message
import oracledb import os import platform d = None # On Linux, no directory should be passed if platform.system() == "Darwin": # macOS d = os.environ.get("HOME")+("/Downloads/instantclient_23_3") elif platform.system() == "Windows": # Windows d = r"C:\Program Files\instantclient_23_5" oracledb.init_oracle_client(lib_dir=d) connection = oracledb.connect(dsn='euls20092.sgp.st.com:1531/pdb1',user='yjaquier',password='secure_password') # sql = "select user_data from yjaquier.txeventq01" # with connection.cursor() as cursor: # print("Get all rows via an iterator") # for result in cursor.execute(sql): # print(result) queue = connection.queue("TXEVENTQ01", "JSON") # queue = connection.queue("AQ01", "JSON") queue.deqOptions.consumername="subscriber01" queue.deqOptions.wait=oracledb.DEQ_NO_WAIT queue.deqOptions.mode=oracledb.DEQ_REMOVE message = queue.deqOne() if (message): print(message.payload) else: print("No message to dequeue") connection.commit() |
When I execute:
$ python oracle_dequeue.py {'id': Decimal('89'), 'descr': 'Eighty-nine'} (oracle) |
Enqueue/dequeue testing with Java
Errors encountered
While developing my code, on top of dependencies stupid error message like:
Caused by: java.lang.ClassNotFoundException: jakarta.transaction.Synchronization |
If you do not add Jakarta Transaction API to your project. What I have as dependencies at the end:

I finally got this strange error:
Exception in thread "main" oracle.jakarta.jms.AQjmsException: JMS-190: Queue yjaquier.txeventq01 not found |
As I was 100% sure that owner and queue names were right it has taken a bit of time to debug that Oracle Java Message Service (Oracle JMS) was expecting a queue with standard message and not a queue with JSON message:
Oracle Java Message Service (Oracle JMS) supports the standard JMS interfaces and has extensions to support administrative operations and other features that are not a part of the standard.
At the end if you create a txeventq02 TxEventQ with the default message type:
DECLARE queue_properties dbms_aqadm.queue_props_t; subscriber01 sys.aq$_agent; BEGIN queue_properties.retention_time := 10; -- For testing purpose, delete message 10 seconds after they have ben dequeued dbms_aqadm.create_transactional_event_queue(queue_name => 'txeventq02', multiple_consumers => TRUE, COMMENT => 'My TxEventQ', queue_payload_type => dbms_aqadm.jms_type, --default value queue_properties => queue_properties); dbms_aqadm.start_queue(queue_name => 'txeventq02'); subscriber01 := sys.aq$_agent(name => 'subscriber01', address => NULL, protocol => 0); dbms_aqadm.add_subscriber(queue_name => 'txeventq02', subscriber => subscriber01); END; / |
The queue tables are quite different:
SQL> DESC yjaquier.txeventq01 Name NULL? TYPE ----------------------------------------- -------- ---------------------------- MSGID RAW(16) SHARD NUMBER(38) SEQ_NUM NUMBER(38) CORRELATION VARCHAR2(128) ENQUEUE_TIME TIMESTAMP(6) WITH TIME ZONE PRIORITY NUMBER(38) DELIVERY_TIME TIMESTAMP(6) WITH TIME ZONE EXPIRATION TIMESTAMP(6) WITH TIME ZONE STEP_NUMBER NUMBER(38) STATE NUMBER(38) SUBSHARD NUMBER(38) SUBSCRIBER_MAP RAW(1024) OLD_MSGID RAW(16) EXCEPTION_QUEUE VARCHAR2(128) USER_DATA JSON SQL> DESC yjaquier.txeventq02 Name NULL? TYPE ----------------------------------------- -------- ---------------------------- MSGID RAW(16) SHARD NUMBER(38) SEQ_NUM NUMBER(38) CORRELATION VARCHAR2(128) ENQUEUE_TIME TIMESTAMP(6) WITH TIME ZONE PRIORITY NUMBER(38) DELIVERY_TIME TIMESTAMP(6) WITH TIME ZONE EXPIRATION TIMESTAMP(6) WITH TIME ZONE STEP_NUMBER NUMBER(38) STATE NUMBER(38) SUBSHARD NUMBER(38) SUBSCRIBER_MAP RAW(1024) OLD_MSGID RAW(16) EXCEPTION_QUEUE VARCHAR2(128) JMS_MSG_TYPE NUMBER HEADER_PROPERTIES VARCHAR2(4000) USER_PROPERTIES_TEXT VARCHAR2(4000) USER_PROPERTIES_CLOB CLOB USERDATA_RAW RAW(2000) USERDATA_BLOB BLOB |
Enqueue and dequeue a message
My Java code is:
package txeventq01; import java.sql.Connection; import java.sql.SQLException; import jakarta.jms.JMSException; import jakarta.jms.Session; import jakarta.jms.Topic; import jakarta.jms.TopicConnection; import jakarta.jms.TopicConnectionFactory; import oracle.AQ.AQException; import oracle.jakarta.jms.AQjmsAgent; import oracle.jakarta.jms.AQjmsFactory; import oracle.jakarta.jms.AQjmsSession; import oracle.jakarta.jms.AQjmsTextMessage; import oracle.jakarta.jms.AQjmsTopicPublisher; import oracle.jakarta.jms.AQjmsTopicSubscriber; import oracle.jdbc.pool.OracleDataSource; import java.util.Random; import pl.allegro.finance.tradukisto.ValueConverters; public class txeventq01 { private static final String username = "yjaquier"; private static final String password = "secure_password"; private static final String url = "jdbc:oracle:thin:@//server01.domain.com:1531/pdb1"; private static final String topicName = "txeventq02"; private static final String subscriberName = "subscriber01"; private static void PublishTxEventQ(OracleDataSource ds) throws AQException, SQLException, JMSException { Random rand = new Random(); ValueConverters intConverter = ValueConverters.ENGLISH_INTEGER; // create a JMS topic connection and session TopicConnectionFactory tcf = AQjmsFactory.getTopicConnectionFactory(ds); TopicConnection conn = tcf.createTopicConnection(); conn.start(); var session = (AQjmsSession) conn.createSession(true, Session.AUTO_ACKNOWLEDGE); // publish message Topic topic = session.getTopic(username, topicName); AQjmsTopicPublisher publisher = (AQjmsTopicPublisher) session.createPublisher(topic); int i = rand.nextInt(1000)+1; String baseMsg = "{\"id\":" + i + ",\"descr\":\"" + intConverter.asWords(i) + "\"}"; System.out.println("Enqueued message: " + baseMsg); AQjmsTextMessage message = (AQjmsTextMessage) session.createTextMessage(baseMsg); publisher.publish(message, new AQjmsAgent[]{new AQjmsAgent(subscriberName, null)}); session.commit(); // clean up publisher.close(); session.close(); conn.close(); } private static void ConsumeTxEventQ(OracleDataSource ds) throws AQException, SQLException, JMSException { // create a JMS topic connection and session TopicConnectionFactory tcf = AQjmsFactory.getTopicConnectionFactory(ds); TopicConnection conn = tcf.createTopicConnection(); conn.start(); var session = (AQjmsSession) conn.createSession(true, Session.AUTO_ACKNOWLEDGE); // create a subscriber on the topic Topic topic = session.getTopic(username, topicName); AQjmsTopicSubscriber subscriber = (AQjmsTopicSubscriber) session.createDurableSubscriber(topic, subscriberName); System.out.println("Waiting for messages..."); // the 1_000 is a one second timeout AQjmsTextMessage message = (AQjmsTextMessage) subscriber.receive(1_000); if (message != null) { if (message.getText() != null) { System.out.println("Dequeued message: " + message.getText()); } else { System.out.println(); } } session.commit(); } public static void main(String[] args) throws AQException, SQLException, JMSException { // Create DB connection OracleDataSource ds = new OracleDataSource(); ds.setURL(url); ds.setUser(username); ds.setPassword(password); Connection connection01 = ds.getConnection(); if (connection01 != null) { System.out.println("Connected!"); } else { System.out.println("Problem Connecting to the database..."); System.exit(1); } // Publish a message in my TxEventQ PublishTxEventQ(ds); // Pause between produce and consumer to allow to check what happened at database level System.out.println("Press Enter to continue"); try { System.in.read(); } catch (Exception e) { System.out.println(e.getMessage()); } // Reading the message from TxEventQ ConsumeTxEventQ(ds); } } |
Graphically the result is:

When you are asked to press enter what you have inside the database:
SQL> SELECT enqueue_time, utl_raw.cast_to_varchar2(userdata_raw) AS userdata_raw FROM txeventq02 ORDER BY enqueue_time DESC; ENQUEUE_TIME USERDATA_RAW ______________________________________ _________________________________ 04-DEC-24 03.30.38.967232000 PM GMT {"id":22,"descr":"twenty-two"} SQL> SELECT msg_id, msg_state, enq_timestamp, deq_timestamp, consumer_name FROM aq$txeventq02 ORDER BY enq_timestamp DESC; MSG_ID MSG_STATE ENQ_TIMESTAMP DEQ_TIMESTAMP CONSUMER_NAME ___________________________________ ____________ __________________________________ ________________ ________________ 00000000000000000000000001660000 READY 04-DEC-24 04.30.38.967232000 PM SUBSCRIBER01 SQL> SELECT * FROM dba_queue_subscribers WHERE owner='YJAQUIER' AND queue_name='TXEVENTQ02'; OWNER QUEUE_NAME QUEUE_TABLE CONSUMER_NAME ADDRESS PROTOCOL TRANSFORMATION RULE DELIVERY_MODE IF_NONDURABLE_SUBSCRIBER QUEUE_TO_QUEUE SUBSCRIBER_ID POS_BITMAP ___________ _____________ ______________ ________________ __________ ___________ _________________ _______ ________________ ___________________________ _________________ ________________ _____________ YJAQUIER TXEVENTQ02 TXEVENTQ02 SUBSCRIBER01 0 PERSISTENT NO FALSE 1 0 |
Once you have pressed enter and the message has been dequeued:
SQL> SELECT msg_id, msg_state, enq_timestamp, deq_timestamp, consumer_name FROM aq$txeventq02 ORDER BY enq_timestamp DESC; MSG_ID MSG_STATE ENQ_TIMESTAMP DEQ_TIMESTAMP CONSUMER_NAME ___________________________________ ____________ __________________________________ __________________________________ ________________ 00000000000000000000000001660000 PROCESSED 04-DEC-24 04.30.38.967232000 PM 04-DEC-24 03.31.29.667238000 PM SUBSCRIBER01 |
Advanced Queueing (AQ) testing
Queue and subscribers creation
As I rated the behavior of TxEventQ feature a bit cranky. I mean that dequeued messages do not disappear after the chosen retention time. Any subscriber can dequeue message enqueued for another subscriber. I have opened a Service Request with My Oracle Support (MOS) but at the end they kept answering: not a bug but a feature. To compare I have decided to test the behavior of the legacy Advanced Queueing (AQ) feature…
Create the AQ table and the queue with:
SET serveroutput ON SIZE unlimited DECLARE queue_properties dbms_aqadm.queue_props_t; BEGIN dbms_aqadm.create_queue_table(queue_table => 'aq01', queue_payload_type => 'json', multiple_consumers => TRUE, COMMENT => 'My AQ'); queue_properties.retention_time := 10; -- For testing purpose, delete message 10 seconds after they have ben dequeued dbms_aqadm.create_queue(queue_name => 'aq01', queue_table => 'aq01', COMMENT => 'My AQ'); dbms_aqadm.start_queue(queue_name => 'aq01'); END; / PL/SQL PROCEDURE successfully completed. |
Add two subscribers:
DECLARE subscriber01 sys.aq$_agent; subscriber02 sys.aq$_agent; BEGIN subscriber01 := sys.aq$_agent(name => 'subscriber01', address => NULL, protocol => 0); dbms_aqadm.add_subscriber(queue_name => 'aq01', subscriber => subscriber01); subscriber02 := sys.aq$_agent(name => 'subscriber02', address => NULL, protocol => 0); dbms_aqadm.add_subscriber(queue_name => 'aq01', subscriber => subscriber02); END; / |
Enqueue a message
How to enqueue a message to only one subscriber (play with the script to insert to the two subscribers):
SET serveroutput ON SIZE unlimited DECLARE enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; recipients dbms_aq.aq$_recipient_list_t; message_handle RAW(16); message json; BEGIN -- create the message payload message:=json('{"id": 1, "descr": "Subscriber01"}'); -- set the consumer name --message_properties.correlation := 'subscriber01'; --message_properties.recipient_list.extend; recipients(1) := sys.aq$_agent(name => 'subscriber01', address => NULL, protocol => 0); -- You can add multiple --recipients(2) := sys.aq$_agent(name => 'subscriber03', address => null, protocol => 0); -- You can add multiple message_properties.recipient_list := recipients; -- enqueue the message and display returning the message_id dbms_aq.enqueue(queue_name => 'aq01', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); dbms_output.put_line('Message enqueued with message_id: ' || message_handle); -- commit the transaction COMMIT; END; / Message enqueued WITH message_id: 24ACDAECA9A1C39EE063402B4B0AE324 PL/SQL PROCEDURE successfully completed. |
If you execute above code and execute again putting in comment all the recipient_list part then you get exactly what’s expected (so not the same as Transactional Event Queue (TxEventQ):
SQL> SELECT msg_id, msg_state, enq_timestamp, deq_timestamp, user_data FROM aq$aq01 ORDER BY enq_timestamp DESC; MSG_ID MSG_STATE ENQ_TIMESTAMP DEQ_TIMESTAMP USER_DATA ___________________________________ ____________ ______________________________ ________________ _____________________________________ 24ACDAECA9A2C39EE063402B4B0AE324 READY 18/10/24 15:25:12,999304000 {"id":2,"descr":"All subscribers"} 24ACDAECA9A2C39EE063402B4B0AE324 READY 18/10/24 15:25:12,999304000 {"id":2,"descr":"All subscribers"} 24ACDAECA9A1C39EE063402B4B0AE324 READY 18/10/24 15:23:40,064909000 {"id":1,"descr":"Subscriber01"} |
Dequeue a message
To dequeue a message from AQ:
SET serveroutput ON SIZE unlimited DECLARE dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message json; BEGIN -- dequeue_mode determines whether we will consume the message or just browse it and leave it there dequeue_options.dequeue_mode := dbms_aq.remove; -- wait controls how long to wait for a message to arrive before giving up dequeue_options.wait := dbms_aq.no_wait; -- we must specify navigation so we know where to look in the TEQ dequeue_options.navigation := dbms_aq.first_message; -- set the consumer name dequeue_options.consumer_name := 'subscriber01'; -- perform the dequeue dbms_aq.dequeue(queue_name => 'aq01', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); -- print out the message payload dbms_output.put_line(json_serialize(message RETURNING VARCHAR2(3) TRUNCATE error ON error)); -- commit the transaction COMMIT; END; / |
When setting consumer_name to subscriber01 you get two message and then an error:
{"id":1,"descr":"Subscriber01"} PL/SQL PROCEDURE successfully completed. {"id":2,"descr":"All subscribers"} PL/SQL PROCEDURE successfully completed. ORA-25228: timeout OR end-of-FETCH during message dequeue FROM YJAQUIER.AQ01 |
Wehn seeting consumer_name to subscriber02 you get one single message and then an error:
{"id":2,"descr":"All subscribers"} PL/SQL PROCEDURE successfully completed. ORA-25228: timeout OR end-of-FETCH during message dequeue FROM YJAQUIER.AQ01 |
And this time the cleaning is executed immediately (remove option)… So clearly Transactional Event Queue (TxEventQ) have plenty of bugs…
Few queries to check at database level:
SELECT * FROM aq01 ORDER BY enq_time DESC; SELECT * FROM aq$aq01 ORDER BY enq_timestamp DESC; SELECT * FROM v$eq_message_cache; SELECT * FROM v$eq_subscriber_load; SELECT * FROM v$eq_dequeue_sessions; |
To clean your AQ testing:
BEGIN dbms_aqadm.stop_queue(queue_name => 'aq01'); dbms_aqadm.drop_transactional_event_queue(queue_name => 'aq01'); END; / PL/SQL PROCEDURE successfully completed. |
References
- Developing Event-Driven Microservices in Java, JavaScript, Python, .NET, and Go with AQ/TEQ in the Oracle Database
- Transactional Event Queues (TxEventQ) example in Java
- python-oracledb/samples/raw_aq.py
- Java Message Service for Transactional Event Queues and Advanced Queuing
- Transactional Event Queues (TxEventQ) and Advanced Queuing (AQ)
- Transactional Events Using Oracle TEQ and Apache Kafka