Transactional Event Queues (TxEventQ) hands-on

Preamble

Transactional Event Queues (TxEventQ) and Advanced Queuing (AQ) are queuing systems integrated with Oracle database. The two features are included for free in all Oracle database editions, TxEventQ also include a limited licence of Oracle database partitioning. Those queueing systems are nowadays extremely popular with web, mobile, IoT, and other data-driven/event-driven applications. Producers enqueue messages and consumers dequeue messages. Apache Kafka solution is one of the most famous around this subject…

From what I have read TxEventQ is an evolution of AQ and is the way to go:

Transactional Event Queues (TxEventQ) is an highly optimized implementation of AQ previously called AQ Sharded Queues

TxEventQ is also the only solution that can be interfaced with the Oracle Kafka Java client but this is out-of-scope of this blog post and we will see this in another one…

Oracle recommandation on which technology to use is:

Message typeOld NameNew Name
Persistent messagesAQ classic queuesAQ queues
Persistent messagesAQ Sharded queuesTxEventQ queues
Buffered messagesAQ classic queuesAQ buffered queues

Versus an Apache Kafka architecture TxEventQ leverage all the features of an Oracle database like high availability, scalability, reliability, being stored in SQL accessible objects…

The big picture inspired by Oracle documentation is (you can transform messages while enqueueing and dequeuing, check DBA_TRANSFORMATIONS view and DBMS_TRANSFORM package for how to do):

txeventq01
txeventq01

I have tested TxEventQ and AQ with Oracle Enterprise edition 21c (21.16) running on a 12 cores and 64GB of memory with Red Hat Enterprise Linux release 8.7 (Ootpa) server.

From what I have seen even in the latest available release at the time of writing this post I rate the feature not mature to be used in production and still lack the basics of what was working perfectly with AQ !

Transactional Event Queues (TxEventQ) creation

Create the TxEventQ with:

DECLARE
  queue_properties dbms_aqadm.queue_props_t;
BEGIN
  queue_properties.retention_time := 10;  -- For testing purpose, delete message 10 seconds after they have ben dequeued
  dbms_aqadm.create_transactional_event_queue(queue_name => 'txeventq01',
	  multiple_consumers => TRUE,
	  COMMENT => 'My TxEventQ', 
	  queue_payload_type => 'json',
	  queue_properties => queue_properties);
  dbms_aqadm.start_queue(queue_name => 'txeventq01');
END;
/
 
PL/SQL PROCEDURE successfully completed.

I had many issues when I forget, so add a (or more) subscriber(s) to your TxEventQ queue with:

DECLARE
  subscriber01 sys.aq$_agent;
BEGIN
  subscriber01 := sys.aq$_agent(name => 'subscriber01', address => NULL, protocol => 0);
  dbms_aqadm.add_subscriber(queue_name => 'txeventq01', subscriber => subscriber01);
END;
/
 
PL/SQL PROCEDURE successfully completed.

You can use DBA views to check all has been done:

SQL> SELECT * FROM dba_queues WHERE owner='YJAQUIER';
 
OWNER       NAME          QUEUE_TABLE         QID QUEUE_TYPE         MAX_RETRIES    RETRY_DELAY ENQUEUE_ENABLED    DEQUEUE_ENABLED    RETENTION    USER_COMMENT    NETWORK_NAME    SHARDED    QUEUE_CATEGORY               RECIPIENTS
___________ _____________ ______________ ________ _______________ ______________ ______________ __________________ __________________ ____________ _______________ _______________ __________ ____________________________ _____________
YJAQUIER    TXEVENTQ01    TXEVENTQ01        77708 NORMAL_QUEUE                 5              0   YES                YES              10           My TxEventQ                     TRUE       Transactional Event Queue    MULTIPLE
 
SQL> SELECT * FROM dba_queue_subscribers WHERE owner='YJAQUIER';
 
OWNER       QUEUE_NAME    QUEUE_TABLE    CONSUMER_NAME    ADDRESS       PROTOCOL TRANSFORMATION    RULE                            DELIVERY_MODE    IF_NONDURABLE_SUBSCRIBER    QUEUE_TO_QUEUE       SUBSCRIBER_ID    POS_BITMAP
___________ _____________ ______________ ________________ __________ ___________ _________________ _______________________________ ________________ ___________________________ _________________ ________________ _____________
YJAQUIER    TXEVENTQ01    TXEVENTQ01     SUBSCRIBER01                          0                                                   PERSISTENT       NO                          FALSE                            1             0

When testing you can clean everything with:

BEGIN
  dbms_aqadm.stop_queue(queue_name => 'txeventq01');  
  dbms_aqadm.drop_transactional_event_queue(queue_name => 'txeventq01');
END;
/
 
PL/SQL PROCEDURE successfully completed.

You can also just flush the queue content with:

DECLARE
  purge_options dbms_aqadm.aq$_purge_options_t;
BEGIN
  purge_options.block := FALSE;
  purge_options.delivery_mode := dbms_aq.persistent;
  dbms_aqadm.purge_queue_table(queue_table => 'txeventq01', purge_condition => NULL, purge_options => purge_options);
END;
/

Enqueue/dequeue testing with PL/SQL

Enqueue a message

SET serveroutput ON SIZE unlimited
DECLARE
  enqueue_options    dbms_aq.enqueue_options_t;
  message_properties dbms_aq.message_properties_t;
  recipients         dbms_aq.aq$_recipient_list_t;
  message_handle     RAW(16);
  message            json;
BEGIN
-- create the message payload
message:=json('{"id": 1, "descr": "Subscriber01"}');
 
-- set the consumer name
--message_properties.correlation := 'subscriber01';
--message_properties.recipient_list.extend;
recipients(1) := sys.aq$_agent(name => 'subscriber01', address => NULL, protocol => 0); -- You can add multiple
--recipients(2) := sys.aq$_agent(name => 'subscriber02', address => null, protocol => 0); -- You can add multiple
message_properties.recipient_list := recipients;
 
-- enqueue the message and display returning the message_id
dbms_aq.enqueue(queue_name => 'txeventq01', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle);
dbms_output.put_line('Message enqueued with message_id: ' || message_handle);
 
-- commit the transaction
COMMIT;
END;
/
 
 
Message enqueued WITH message_id: 00000000000000000200000004660000
 
PL/SQL PROCEDURE successfully completed.

Remark:
I have kept in comment the multiple tries I have done. Trying to insert with DBMS_QA.ENQUEUE a payload that is not of JMS type is either not working or directly failing with an ORA-000600 error message:

ERROR at line 1:
ORA-00600: internal error code, arguments: [FILE:psdmsc.c LINE:1262
ID:OCIKCallPush], [], [], [], [], [], [], [], [], [], [], []
ORA-06512: at "SYS.DBMS_AQ", line 202
ORA-06512: at line 18

Dequeue a message

SET serveroutput ON SIZE unlimited
DECLARE
  dequeue_options     dbms_aq.dequeue_options_t;
  message_properties  dbms_aq.message_properties_t;
  message_handle      RAW(16);
  message json;
BEGIN
  -- dequeue_mode determines whether we will consume the message or just browse it and leave it there
  dequeue_options.dequeue_mode  := dbms_aq.remove;
 
  -- wait controls how long to wait for a message to arrive before giving up
  dequeue_options.wait          := dbms_aq.no_wait;
 
  -- we must specify navigation so we know where to look in the TEQ
  dequeue_options.navigation    := dbms_aq.first_message;          
 
  -- set the consumer name 
  dequeue_options.consumer_name := 'subscriber01';
 
  -- perform the dequeue
  dbms_aq.dequeue(queue_name => 'txeventq01', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle);
 
  -- print out the message payload
  dbms_output.put_line(json_serialize(message RETURNING VARCHAR2(3) TRUNCATE error ON error));
 
  -- commit the transaction
  COMMIT;
END;
/
 
{"id":1,"descr":"One"}
 
PL/SQL PROCEDURE successfully completed.

Few queries to check at database level what happened:

SELECT * FROM aq$txeventq01 ORDER BY enq_timestamp DESC;
SELECT * FROM txeventq01 ORDER BY enqueue_time DESC;
 
SELECT * FROM v$eq_message_cache;
 
SELECT * FROM v$eq_subscriber_load;
 
SELECT * FROM v$eq_dequeue_sessions;

Enqueue/dequeue testing with Python

For Python virtual environments I use Miniconda and to be compliant with the new licensing model of Conda remove, first, the official repository and add the forge one with:

$ conda config --add channels conda-forge
$ conda config --remove channels defaults

And I have created a dedicated environment with Python 3.13 using:

conda create -n "oracle" python=3.13

Once in this environment install Python package oracledb with:

$ pip install oracledb

When accessing the oracledb documentation you will see that advanced queuing can only be used with a thick client:

Oracle Advanced Queuing is only supported in the python-oracledb Thick mode. See Enabling python-oracledb Thick mode.

I have used Instant Client 23.5 that I installed in C:\Program Files\instantclient_23_5, so the code to use it…

Enqueue a message

import oracledb
import os
import platform
from num2words import num2words
import random
 
d = None                               # On Linux, no directory should be passed
if platform.system() == "Darwin":      # macOS
  d = os.environ.get("HOME") + ("/Downloads/instantclient_23_3")
elif platform.system() == "Windows":   # Windows
  d = r"C:\Program Files\instantclient_23_5"
oracledb.init_oracle_client(lib_dir=d)
 
connection = oracledb.connect(dsn='euls20092.sgp.st.com:1531/pdb1',user='yjaquier',password='secure_password')
 
# sql = "select user_data from yjaquier.txeventq01 order by enqueue_time desc"
 
# with connection.cursor() as cursor:
#   print("Get all rows via an iterator")
#   for result in cursor.execute(sql):
#     print(result)
 
queue = connection.queue("TXEVENTQ01", "JSON")
# queue = connection.queue("AQ01", "JSON")
 
i = random.randint(1,100)
 
json_data = [dict(id = i, descr = num2words(i).capitalize())]
 
for data in json_data:
  message_property = connection.msgproperties(recipients = ["subscriber01", "subscriber02"], payload = data)
  queue.enqone(message_property)
  print('Payload: ' + str(data))
  print('Message enqueued with message_id: ' + message_property.msgid.hex())
 
connection.commit()

When I execute:

$ python oracle_enqueue.py
Payload: {'id': 89, 'descr': 'Eighty-nine'}
Message enqueued with message_id: 2589152b4f8cdb5fe063402b4b0ad02d
(oracle)

Dequeue a message

import oracledb
import os
import platform
 
d = None                               # On Linux, no directory should be passed
if platform.system() == "Darwin":      # macOS
  d = os.environ.get("HOME")+("/Downloads/instantclient_23_3")
elif platform.system() == "Windows":   # Windows
  d = r"C:\Program Files\instantclient_23_5"
oracledb.init_oracle_client(lib_dir=d)
 
connection = oracledb.connect(dsn='euls20092.sgp.st.com:1531/pdb1',user='yjaquier',password='secure_password')
 
# sql = "select user_data from yjaquier.txeventq01"
 
# with connection.cursor() as cursor:
#   print("Get all rows via an iterator")
#   for result in cursor.execute(sql):
#     print(result)
 
queue = connection.queue("TXEVENTQ01", "JSON")
# queue = connection.queue("AQ01", "JSON")
 
queue.deqOptions.consumername="subscriber01"
queue.deqOptions.wait=oracledb.DEQ_NO_WAIT
queue.deqOptions.mode=oracledb.DEQ_REMOVE
 
message = queue.deqOne()
 
if (message):
  print(message.payload)
else:
  print("No message to dequeue")
 
connection.commit()

When I execute:

$ python oracle_dequeue.py
{'id': Decimal('89'), 'descr': 'Eighty-nine'}
(oracle)

Enqueue/dequeue testing with Java

Errors encountered

While developing my code, on top of dependencies stupid error message like:

Caused by: java.lang.ClassNotFoundException: jakarta.transaction.Synchronization

If you do not add Jakarta Transaction API to your project. What I have as dependencies at the end:

txeventq02
txeventq02

I finally got this strange error:

Exception in thread "main" oracle.jakarta.jms.AQjmsException: JMS-190: Queue yjaquier.txeventq01 not found

As I was 100% sure that owner and queue names were right it has taken a bit of time to debug that Oracle Java Message Service (Oracle JMS) was expecting a queue with standard message and not a queue with JSON message:

Oracle Java Message Service (Oracle JMS) supports the standard JMS interfaces and has extensions to support administrative operations and other features that are not a part of the standard.

At the end if you create a txeventq02 TxEventQ with the default message type:

DECLARE
  queue_properties dbms_aqadm.queue_props_t;
  subscriber01 sys.aq$_agent;
BEGIN
  queue_properties.retention_time := 10;  -- For testing purpose, delete message 10 seconds after they have ben dequeued
  dbms_aqadm.create_transactional_event_queue(queue_name => 'txeventq02',
	  multiple_consumers => TRUE,
	  COMMENT => 'My TxEventQ',
    queue_payload_type => dbms_aqadm.jms_type, --default value
	  queue_properties => queue_properties);
  dbms_aqadm.start_queue(queue_name => 'txeventq02');
  subscriber01 := sys.aq$_agent(name => 'subscriber01', address => NULL, protocol => 0);
  dbms_aqadm.add_subscriber(queue_name => 'txeventq02', subscriber => subscriber01);
END;
/

The queue tables are quite different:

SQL> DESC yjaquier.txeventq01
 Name                                      NULL?    TYPE
 ----------------------------------------- -------- ----------------------------
 MSGID                                              RAW(16)
 SHARD                                              NUMBER(38)
 SEQ_NUM                                            NUMBER(38)
 CORRELATION                                        VARCHAR2(128)
 ENQUEUE_TIME                                       TIMESTAMP(6) WITH TIME ZONE
 PRIORITY                                           NUMBER(38)
 DELIVERY_TIME                                      TIMESTAMP(6) WITH TIME ZONE
 EXPIRATION                                         TIMESTAMP(6) WITH TIME ZONE
 STEP_NUMBER                                        NUMBER(38)
 STATE                                              NUMBER(38)
 SUBSHARD                                           NUMBER(38)
 SUBSCRIBER_MAP                                     RAW(1024)
 OLD_MSGID                                          RAW(16)
 EXCEPTION_QUEUE                                    VARCHAR2(128)
 USER_DATA                                          JSON
 
SQL> DESC yjaquier.txeventq02
 Name                                      NULL?    TYPE
 ----------------------------------------- -------- ----------------------------
 MSGID                                              RAW(16)
 SHARD                                              NUMBER(38)
 SEQ_NUM                                            NUMBER(38)
 CORRELATION                                        VARCHAR2(128)
 ENQUEUE_TIME                                       TIMESTAMP(6) WITH TIME ZONE
 PRIORITY                                           NUMBER(38)
 DELIVERY_TIME                                      TIMESTAMP(6) WITH TIME ZONE
 EXPIRATION                                         TIMESTAMP(6) WITH TIME ZONE
 STEP_NUMBER                                        NUMBER(38)
 STATE                                              NUMBER(38)
 SUBSHARD                                           NUMBER(38)
 SUBSCRIBER_MAP                                     RAW(1024)
 OLD_MSGID                                          RAW(16)
 EXCEPTION_QUEUE                                    VARCHAR2(128)
 JMS_MSG_TYPE                                       NUMBER
 HEADER_PROPERTIES                                  VARCHAR2(4000)
 USER_PROPERTIES_TEXT                               VARCHAR2(4000)
 USER_PROPERTIES_CLOB                               CLOB
 USERDATA_RAW                                       RAW(2000)
 USERDATA_BLOB                                      BLOB

Enqueue and dequeue a message

My Java code is:

package txeventq01;
 
import java.sql.Connection;
import java.sql.SQLException;
 
import jakarta.jms.JMSException;
import jakarta.jms.Session;
import jakarta.jms.Topic;
import jakarta.jms.TopicConnection;
import jakarta.jms.TopicConnectionFactory;
 
import oracle.AQ.AQException;
import oracle.jakarta.jms.AQjmsAgent;
import oracle.jakarta.jms.AQjmsFactory;
import oracle.jakarta.jms.AQjmsSession;
import oracle.jakarta.jms.AQjmsTextMessage;
import oracle.jakarta.jms.AQjmsTopicPublisher;
import oracle.jakarta.jms.AQjmsTopicSubscriber;
import oracle.jdbc.pool.OracleDataSource;
 
import java.util.Random;
import pl.allegro.finance.tradukisto.ValueConverters;
 
public class txeventq01 {
  private static final String username = "yjaquier";
  private static final String password = "secure_password";
  private static final String url = "jdbc:oracle:thin:@//server01.domain.com:1531/pdb1";
  private static final String topicName = "txeventq02";
  private static final String subscriberName = "subscriber01";
 
  private static void PublishTxEventQ(OracleDataSource ds) throws AQException, SQLException, JMSException {
    Random rand = new Random();
    ValueConverters intConverter = ValueConverters.ENGLISH_INTEGER;
 
    // create a JMS topic connection and session
    TopicConnectionFactory tcf = AQjmsFactory.getTopicConnectionFactory(ds);
    TopicConnection conn = tcf.createTopicConnection();
    conn.start();
    var session = (AQjmsSession) conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
 
    // publish message
    Topic topic = session.getTopic(username, topicName);
 
    AQjmsTopicPublisher publisher = (AQjmsTopicPublisher) session.createPublisher(topic);
 
    int i = rand.nextInt(1000)+1;
    String baseMsg = "{\"id\":" + i + ",\"descr\":\"" + intConverter.asWords(i) + "\"}";
    System.out.println("Enqueued message: " + baseMsg);
    AQjmsTextMessage message = (AQjmsTextMessage) session.createTextMessage(baseMsg);
    publisher.publish(message, new AQjmsAgent[]{new AQjmsAgent(subscriberName, null)});
    session.commit();
 
    // clean up
    publisher.close();
    session.close();
    conn.close();
  }
 
  private static void ConsumeTxEventQ(OracleDataSource ds) throws AQException, SQLException, JMSException {
    // create a JMS topic connection and session
    TopicConnectionFactory tcf = AQjmsFactory.getTopicConnectionFactory(ds);
    TopicConnection conn = tcf.createTopicConnection();
    conn.start();
    var session = (AQjmsSession) conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
 
    // create a subscriber on the topic
    Topic topic = session.getTopic(username, topicName);
    AQjmsTopicSubscriber subscriber = (AQjmsTopicSubscriber) session.createDurableSubscriber(topic, subscriberName);
 
    System.out.println("Waiting for messages...");
 
    // the 1_000 is a one second timeout
    AQjmsTextMessage message = (AQjmsTextMessage) subscriber.receive(1_000);
    if (message != null) {
      if (message.getText() != null) {
        System.out.println("Dequeued message: " + message.getText());
      } else {
        System.out.println();
      }
    }
    session.commit();
  }
 
  public static void main(String[] args) throws AQException, SQLException, JMSException {
    // Create DB connection
    OracleDataSource ds = new OracleDataSource();
    ds.setURL(url);
    ds.setUser(username);
    ds.setPassword(password);
    Connection connection01 = ds.getConnection();
    if (connection01 != null) {
      System.out.println("Connected!");
    } else {
      System.out.println("Problem Connecting to the database...");
      System.exit(1);
    }
 
    // Publish a message in my TxEventQ
    PublishTxEventQ(ds);
 
    // Pause between produce and consumer to allow to check what happened at database level
    System.out.println("Press Enter to continue");
    try {
      System.in.read();
    }
    catch (Exception e) {
      System.out.println(e.getMessage());
    }
    // Reading the message from TxEventQ
    ConsumeTxEventQ(ds);
  }
}

Graphically the result is:

txeventq03
txeventq03

When you are asked to press enter what you have inside the database:

SQL> SELECT enqueue_time, utl_raw.cast_to_varchar2(userdata_raw) AS userdata_raw FROM txeventq02 ORDER BY enqueue_time DESC;
 
ENQUEUE_TIME                           USERDATA_RAW
______________________________________ _________________________________
04-DEC-24 03.30.38.967232000 PM GMT    {"id":22,"descr":"twenty-two"}
 
SQL> SELECT msg_id, msg_state, enq_timestamp, deq_timestamp, consumer_name FROM aq$txeventq02 ORDER BY enq_timestamp DESC;
 
MSG_ID                              MSG_STATE    ENQ_TIMESTAMP                      DEQ_TIMESTAMP    CONSUMER_NAME
___________________________________ ____________ __________________________________ ________________ ________________
00000000000000000000000001660000    READY        04-DEC-24 04.30.38.967232000 PM                     SUBSCRIBER01
 
SQL> SELECT * FROM dba_queue_subscribers WHERE owner='YJAQUIER' AND queue_name='TXEVENTQ02';
 
OWNER       QUEUE_NAME    QUEUE_TABLE    CONSUMER_NAME    ADDRESS       PROTOCOL TRANSFORMATION    RULE    DELIVERY_MODE    IF_NONDURABLE_SUBSCRIBER    QUEUE_TO_QUEUE       SUBSCRIBER_ID    POS_BITMAP
___________ _____________ ______________ ________________ __________ ___________ _________________ _______ ________________ ___________________________ _________________ ________________ _____________
YJAQUIER    TXEVENTQ02    TXEVENTQ02     SUBSCRIBER01                          0                           PERSISTENT       NO                          FALSE                            1             0

Once you have pressed enter and the message has been dequeued:

SQL> SELECT msg_id, msg_state, enq_timestamp, deq_timestamp, consumer_name FROM aq$txeventq02 ORDER BY enq_timestamp DESC;
 
MSG_ID                              MSG_STATE    ENQ_TIMESTAMP                      DEQ_TIMESTAMP                      CONSUMER_NAME
___________________________________ ____________ __________________________________ __________________________________ ________________
00000000000000000000000001660000    PROCESSED    04-DEC-24 04.30.38.967232000 PM    04-DEC-24 03.31.29.667238000 PM    SUBSCRIBER01

Advanced Queueing (AQ) testing

Queue and subscribers creation

As I rated the behavior of TxEventQ feature a bit cranky. I mean that dequeued messages do not disappear after the chosen retention time. Any subscriber can dequeue message enqueued for another subscriber. I have opened a Service Request with My Oracle Support (MOS) but at the end they kept answering: not a bug but a feature. To compare I have decided to test the behavior of the legacy Advanced Queueing (AQ) feature…

Create the AQ table and the queue with:

SET serveroutput ON SIZE unlimited
DECLARE
  queue_properties dbms_aqadm.queue_props_t;
BEGIN
  dbms_aqadm.create_queue_table(queue_table => 'aq01', queue_payload_type => 'json', multiple_consumers => TRUE, COMMENT => 'My AQ');
  queue_properties.retention_time := 10;  -- For testing purpose, delete message 10 seconds after they have ben dequeued
  dbms_aqadm.create_queue(queue_name => 'aq01',
    queue_table => 'aq01',
	  COMMENT => 'My AQ');
  dbms_aqadm.start_queue(queue_name => 'aq01');
END;
/
 
PL/SQL PROCEDURE successfully completed.

Add two subscribers:

DECLARE
  subscriber01 sys.aq$_agent;
  subscriber02 sys.aq$_agent;
BEGIN
  subscriber01 := sys.aq$_agent(name => 'subscriber01', address => NULL, protocol => 0);
  dbms_aqadm.add_subscriber(queue_name => 'aq01', subscriber => subscriber01);
  subscriber02 := sys.aq$_agent(name => 'subscriber02', address => NULL, protocol => 0);
  dbms_aqadm.add_subscriber(queue_name => 'aq01', subscriber => subscriber02);
END;
/

Enqueue a message

How to enqueue a message to only one subscriber (play with the script to insert to the two subscribers):

SET serveroutput ON SIZE unlimited
DECLARE
  enqueue_options    dbms_aq.enqueue_options_t;
  message_properties dbms_aq.message_properties_t;
  recipients         dbms_aq.aq$_recipient_list_t;
  message_handle     RAW(16);
  message            json;
BEGIN
-- create the message payload
message:=json('{"id": 1, "descr": "Subscriber01"}');
 
-- set the consumer name
--message_properties.correlation := 'subscriber01';
--message_properties.recipient_list.extend;
recipients(1) := sys.aq$_agent(name => 'subscriber01', address => NULL, protocol => 0); -- You can add multiple
--recipients(2) := sys.aq$_agent(name => 'subscriber03', address => null, protocol => 0); -- You can add multiple
message_properties.recipient_list := recipients;
 
-- enqueue the message and display returning the message_id
dbms_aq.enqueue(queue_name => 'aq01', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle);
dbms_output.put_line('Message enqueued with message_id: ' || message_handle);
 
-- commit the transaction
COMMIT;
END;
/
 
Message enqueued WITH message_id: 24ACDAECA9A1C39EE063402B4B0AE324
 
PL/SQL PROCEDURE successfully completed.

If you execute above code and execute again putting in comment all the recipient_list part then you get exactly what’s expected (so not the same as Transactional Event Queue (TxEventQ):

SQL> SELECT msg_id, msg_state, enq_timestamp, deq_timestamp, user_data FROM aq$aq01 ORDER BY enq_timestamp DESC;
 
MSG_ID                              MSG_STATE    ENQ_TIMESTAMP                  DEQ_TIMESTAMP    USER_DATA
___________________________________ ____________ ______________________________ ________________ _____________________________________
24ACDAECA9A2C39EE063402B4B0AE324    READY        18/10/24 15:25:12,999304000                     {"id":2,"descr":"All subscribers"}
24ACDAECA9A2C39EE063402B4B0AE324    READY        18/10/24 15:25:12,999304000                     {"id":2,"descr":"All subscribers"}
24ACDAECA9A1C39EE063402B4B0AE324    READY        18/10/24 15:23:40,064909000                     {"id":1,"descr":"Subscriber01"}

Dequeue a message

To dequeue a message from AQ:

SET serveroutput ON SIZE unlimited
DECLARE
  dequeue_options     dbms_aq.dequeue_options_t;
  message_properties  dbms_aq.message_properties_t;
  message_handle      RAW(16);
  message json;
BEGIN
  -- dequeue_mode determines whether we will consume the message or just browse it and leave it there
  dequeue_options.dequeue_mode  := dbms_aq.remove;
 
  -- wait controls how long to wait for a message to arrive before giving up
  dequeue_options.wait          := dbms_aq.no_wait;
 
  -- we must specify navigation so we know where to look in the TEQ
  dequeue_options.navigation    := dbms_aq.first_message;          
 
  -- set the consumer name 
  dequeue_options.consumer_name := 'subscriber01';
 
  -- perform the dequeue
  dbms_aq.dequeue(queue_name => 'aq01', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle);
 
  -- print out the message payload
  dbms_output.put_line(json_serialize(message RETURNING VARCHAR2(3) TRUNCATE error ON error));
 
  -- commit the transaction
  COMMIT;
END;
/

When setting consumer_name to subscriber01 you get two message and then an error:

{"id":1,"descr":"Subscriber01"}
 
 
PL/SQL PROCEDURE successfully completed.
 
{"id":2,"descr":"All subscribers"}
 
 
PL/SQL PROCEDURE successfully completed.
 
ORA-25228: timeout OR end-of-FETCH during message dequeue FROM YJAQUIER.AQ01

Wehn seeting consumer_name to subscriber02 you get one single message and then an error:

{"id":2,"descr":"All subscribers"}
 
 
PL/SQL PROCEDURE successfully completed.
 
 
ORA-25228: timeout OR end-of-FETCH during message dequeue FROM YJAQUIER.AQ01

And this time the cleaning is executed immediately (remove option)… So clearly Transactional Event Queue (TxEventQ) have plenty of bugs…

Few queries to check at database level:

SELECT * FROM aq01 ORDER BY enq_time DESC;
SELECT * FROM aq$aq01 ORDER BY enq_timestamp DESC;
 
SELECT * FROM v$eq_message_cache;
 
SELECT * FROM v$eq_subscriber_load;
 
SELECT * FROM v$eq_dequeue_sessions;

To clean your AQ testing:

BEGIN
  dbms_aqadm.stop_queue(queue_name => 'aq01');  
  dbms_aqadm.drop_transactional_event_queue(queue_name => 'aq01');
END;
/
 
PL/SQL PROCEDURE successfully completed.

References

About Post Author

Share the knowledge!

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes:

<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>