Oracle Advanced Queuing (AQ)
Simple Producer/Consumer
Section titled “Simple Producer/Consumer”Overview
Section titled “Overview”Create a queue that we can send a message to. Oracle will notify our stored procedure that a message has been enqueued and should be worked. We’ll also add some subprograms we can use in an emergency to stop messages from being deqeued, allow dequeuing again, and run a simple batch job to work through all of the messages.
These examples were tested on Oracle Database 12c Enterprise Edition Release 12.1.0.2.0 - 64bit Production.
Create Queue
Section titled “Create Queue”We will create a message type, a queue table that can hold the messages, and a queue. Messages in the queue will be dequeued first by priority then be their enqueue time. If anything goes wrong working the message and the dequeue is rolled-back AQ will make the message available for dequeue 3600 seconds later. It will do this 48 times before moving it an exception queue.
create type message_t as object ( sender varchar2 ( 50 ), message varchar2 ( 512 ) );/-- Type MESSAGE_T compiledbegin dbms_aqadm.create_queue_table( queue_table => 'MESSAGE_Q_TBL', queue_payload_type => 'MESSAGE_T', sort_list => 'PRIORITY,ENQ_TIME', multiple_consumers => false, compatible => '10.0.0'); end;/-- PL/SQL procedure successfully completed.begin dbms_aqadm.create_queue( queue_name => 'MESSAGE_Q', queue_table => 'MESSAGE_Q_TBL', queue_type => 0, max_retries => 48, retry_delay => 3600, dependency_tracking => false); end;/-- PL/SQL procedure successfully completed.Now that we have a place to put the messages lets create a package to manage and work messages in the queue.
create or replace package message_worker_pkgis queue_name_c constant varchar2(20) := 'MESSAGE_Q';
-- allows the workers to process messages in the queue procedure enable_dequeue;
-- prevents messages from being worked but will still allow them to be created and enqueued procedure disable_dequeue;
-- called only by Oracle Advanced Queueing. Do not call anywhere else. procedure on_message_enqueued (context in raw, reginfo in sys.aq$_reg_info, descr in sys.aq$_descriptor, payload in raw, payloadl in number);
-- allows messages to be worked if we missed the notification (or a retry -- is pending) procedure work_old_messages;
end;/
create or replace package body message_worker_pkgis -- raised by Oracle when we try to dequeue but no more messages are ready to -- be dequeued at this moment no_more_messages_ex exception; pragma exception_init (no_more_messages_ex, -25228);
-- allows the workers to process messages in the queue procedure enable_dequeue as begin dbms_aqadm.start_queue (queue_name => queue_name_c, dequeue => true); end enable_dequeue;
-- prevents messages from being worked but will still allow them to be created and enqueued procedure disable_dequeue as begin dbms_aqadm.stop_queue (queue_name => queue_name_c, dequeue => true, enqueue => false); end disable_dequeue;
procedure work_message (message_in in out nocopy message_t) as begin dbms_output.put_line ( message_in.sender || ' says ' || message_in.message ); end work_message;
-- called only by Oracle Advanced Queueing. Do not call anywhere else.
procedure on_message_enqueued (context in raw, reginfo in sys.aq$_reg_info, descr in sys.aq$_descriptor, payload in raw, payloadl in number) as pragma autonomous_transaction; dequeue_options_l dbms_aq.dequeue_options_t; message_id_l raw (16); message_l message_t; message_properties_l dbms_aq.message_properties_t; begin dequeue_options_l.msgid := descr.msg_id; dequeue_options_l.consumer_name := descr.consumer_name; dequeue_options_l.wait := dbms_aq.no_wait; dbms_aq.dequeue (queue_name => descr.queue_name, dequeue_options => dequeue_options_l, message_properties => message_properties_l, payload => message_l, msgid => message_id_l); work_message (message_l); commit; exception when no_more_messages_ex then -- it's possible work_old_messages already dequeued the message commit; when others then -- we don't need to have a raise here. I just wanted to point out that -- since this will be called by AQ throwing the exception back to it -- will have it put the message back on the queue and retry later raise; end on_message_enqueued;
-- allows messages to be worked if we missed the notification (or a retry -- is pending) procedure work_old_messages as pragma autonomous_transaction; dequeue_options_l dbms_aq.dequeue_options_t; message_id_l raw (16); message_l message_t; message_properties_l dbms_aq.message_properties_t; begin dequeue_options_l.wait := dbms_aq.no_wait; dequeue_options_l.navigation := dbms_aq.first_message;
while (true) loop -- way out is no_more_messages_ex dbms_aq.dequeue (queue_name => queue_name_c, dequeue_options => dequeue_options_l, message_properties => message_properties_l, payload => message_l, msgid => message_id_l); work_message (message_l); commit; end loop; exception when no_more_messages_ex then null; end work_old_messages;end;Next tell AQ that when a message is enqueued to MESSAGE_Q (and committed) notify our procedure it has work to do. AQ will start up a job in its own session to handle this.
begin dbms_aq.register ( sys.aq$_reg_info_list ( sys.aq$_reg_info (user || '.' || message_worker_pkg.queue_name_c, dbms_aq.namespace_aq, 'plsql://' || user || '.message_worker_pkg.on_message_enqueued', hextoraw ('FF'))), 1); commit;end;Start Queue and Send a Message
Section titled “Start Queue and Send a Message”declare enqueue_options_l dbms_aq.enqueue_options_t; message_properties_l dbms_aq.message_properties_t; message_id_l raw (16); message_l message_t;begin -- only need to do this next line ONCE dbms_aqadm.start_queue (queue_name => message_worker_pkg.queue_name_c, enqueue => true , dequeue => true);
message_l := new message_t ( 'Jon', 'Hello, world!' ); dbms_aq.enqueue (queue_name => message_worker_pkg.queue_name_c, enqueue_options => enqueue_options_l, message_properties => message_properties_l, payload => message_l, msgid => message_id_l); commit;end;