Oracle® Streams Advanced Queuing User's Guide and Reference Release 10.1 Part Number B10785-01 |
|
|
View PDF |
This chapter describes the prerequisites for using Oracle Streams Advanced Queuing (AQ). It discusses planning and design issues and includes several frequently asked questions about Oracle Streams AQ.
This chapter contains the following topics:
Oracle Streams AQ prerequisites depend on:
Your operating environment and programing languages
How structured your data is
Messaging Requirements
What your source and target systems are, in other words where you are sending your messages to and from.
Oracle Streams AQ is provided with Oracle Database 10g.
This section provides examples of Oracle Streams Advanced Queuing (AQ) operations using different programmatic environments.
This section contains these topics:
You must set up the following data structures for certain examples to work:
CONNECT system/manager; DROP USER aqadm CASCADE; GRANT CONNECT, RESOURCE TO aqadm; CREATE USER aqadm IDENTIFIED BY aqadm; GRANT EXECUTE ON DBMS_AQADM TO aqadm; GRANT Aq_administrator_role TO aqadm; DROP USER aq CASCADE; CREATE USER aq IDENTIFIED BY aq; GRANT CONNECT, RESOURCE TO aq; GRANT EXECUTE ON dbms_aq TO aq;
The following examples illustrate how to create Oracle Streams AQ queues and queue tables:
Example 2-1 Creating a Queue Table and Queue of Object Type
/* Creating a message type: */ CREATE type aq.Message_typ as object ( subject VARCHAR2(30), text VARCHAR2(80)); /* Creating a object type queue table and queue: */ EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE ( queue_table => 'aq.objmsgs80_qtab', queue_payload_type => 'aq.Message_typ'); EXECUTE DBMS_AQADM.CREATE_QUEUE ( queue_name => 'msg_queue', queue_table => 'aq.objmsgs80_qtab'); EXECUTE DBMS_AQADM.START_QUEUE ( queue_name => 'msg_queue');
Example 2-2 Creating a Queue Table and Queue of Raw Type
/* Creating a RAW type queue table and queue: */ EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE ( queue_table => 'aq.RawMsgs_qtab', queue_payload_type => 'RAW'); EXECUTE DBMS_AQADM.CREATE_QUEUE ( queue_name => 'raw_msg_queue', queue_table => 'aq.RawMsgs_qtab'); EXECUTE DBMS_AQADM.START_QUEUE ( queue_name => 'raw_msg_queue');
Example 2-3 Creating a Prioritized Message Queue Table and Queue
EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE ( queue_table => 'aq.priority_msg', sort_list => 'PRIORITY,ENQ_TIME', queue_payload_type => 'aq.Message_typ'); EXECUTE DBMS_AQADM.CREATE_QUEUE ( queue_name => 'priority_msg_queue', queue_table => 'aq.priority_msg'); EXECUTE DBMS_AQADM.START_QUEUE ( queue_name => 'priority_msg_queue');
Example 2-4 Creating a Multiconsumer Queue Table and Queue
EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE ( queue_table => 'aq.MultiConsumerMsgs_qtab', multiple_consumers => TRUE, queue_payload_type => 'aq.Message_typ'); EXECUTE DBMS_AQADM.CREATE_QUEUE ( queue_name => 'msg_queue_multiple', queue_table => 'aq.MultiConsumerMsgs_qtab'); EXECUTE DBMS_AQADM.START_QUEUE ( queue_name => 'msg_queue_multiple');
Example 2-5 Creating a Queue to Demonstrate Propagation
EXECUTE DBMS_AQADM.CREATE_QUEUE ( queue_name => 'another_msg_queue', queue_table => 'aq.MultiConsumerMsgs_qtab'); EXECUTE DBMS_AQADM.START_QUEUE ( queue_name => 'another_msg_queue');
Example 2-6 Setting Up Java Oracle Streams AQ Examples
CONNECT system/manager DROP USER aqjava CASCADE; GRANT CONNECT, RESOURCE, AQ_ADMINISTRATOR_ROLE TO aqjava IDENTIFIED BY aqjava; GRANT EXECUTE ON DBMS_AQADM TO aqjava; GRANT EXECUTE ON DBMS_AQ TO aqjava; CONNECT aqjava/aqjava /* Set up main class from which we will call subsequent examples and handle exceptions: */ import java.sql.*; import oracle.AQ.*; public class test_aqjava { public static void main(String args[]) { AQSession aq_sess = null; try { aq_sess = createSession(args); /* now run the test: */ runTest(aq_sess); } catch (Exception ex) { System.out.println("Exception-1: " + ex); ex.printStackTrace(); } } }
Example 2-7 Creating a Java Oracle Streams AQ Session for User 'aqjava'
public static AQSession createSession(String args[]) { Connection db_conn; AQSession aq_sess = null; try { Class.forName("oracle.jdbc.driver.OracleDriver"); /* your actual hostname, port number, and SID will vary from what follows. Here we use 'dlsun736,' '5521,' and 'test,' respectively: */ db_conn = DriverManager.getConnection( "jdbc:oracle:thin:@dlsun736:5521:test", "aqjava", "aqjava"); System.out.println("JDBC Connection opened "); db_conn.setAutoCommit(false); /* Load the Oracle8i AQ driver: */ Class.forName("oracle.AQ.AQOracleDriver"); /* Creating an AQ Session: */ aq_sess = AQDriverManager.createAQSession(db_conn); System.out.println("Successfully created AQSession "); } catch (Exception ex) { System.out.println("Exception: " + ex); ex.printStackTrace(); } return aq_sess; }
Example 2-8 Creating a Queue Table and Queue Using Java
public static void runTest(AQSession aq_sess) throws AQException { AQQueueTableProperty qtable_prop; AQQueueProperty queue_prop; AQQueueTable q_table; AQQueue queue; /* Creating a AQQueueTableProperty object (payload type - RAW): */ qtable_prop = new AQQueueTableProperty("RAW"); /* Creating a queue table called aq_table1 in aqjava schema: */ q_table = aq_sess.createQueueTable ("aqjava", "aq_table1", qtable_prop); System.out.println("Successfully created aq_table1 in aqjava schema"); /* Creating a new AQQueueProperty object */ queue_prop = new AQQueueProperty(); /* Creating a queue called aq_queue1 in aq_table1: */ queue = aq_sess.createQueue (q_table, "aq_queue1", queue_prop); System.out.println("Successfully created aq_queue1 in aq_table1"); } /* Get a handle to an existing queue table and queue: */ public static void runTest(AQSession aq_sess) throws AQException { AQQueueTable q_table; AQQueue queue; /* Get a handle to queue table - aq_table1 in aqjava schema: */ q_table = aq_sess.getQueueTable ("aqjava", "aq_table1"); System.out.println("Successful getQueueTable"); /* Get a handle to a queue - aq_queue1 in aqjava schema: */ queue = aq_sess.getQueue ("aqjava", "aq_queue1"); System.out.println("Successful getQueue"); }
Example 2-9 Creating a Queue and Starting Enqueue or Dequeue Using Java
{ AQQueueTableProperty qtable_prop; AQQueueProperty queue_prop; AQQueueTable q_table; AQQueue queue; /* Creating a AQQueueTable property object (payload type - RAW): */ qtable_prop = new AQQueueTableProperty("RAW"); qtable_prop.setCompatible("8.1"); /* Creating a queue table called aq_table3 in aqjava schema: */ q_table = aq_sess.createQueueTable ("aqjava", "aq_table3", qtable_prop); System.out.println("Successful createQueueTable"); /* Creating a new AQQueueProperty object: */ queue_prop = new AQQueueProperty(); /* Creating a queue called aq_queue3 in aq_table3: */ queue = aq_sess.createQueue (q_table, "aq_queue3", queue_prop); System.out.println("Successful createQueue"); /* Enable enqueue/dequeue on this queue: */ queue.start(); System.out.println("Successful start queue"); /* Grant enqueue_any privilege on this queue to user scott: */ queue.grantQueuePrivilege("ENQUEUE", "scott"); System.out.println("Successful grantQueuePrivilege"); }
Example 2-10 Creating a Multiconsumer Queue and Adding Subscribers Using Java
public static void runTest(AQSession aq_sess) throws AQException { AQQueueTableProperty qtable_prop; AQQueueProperty queue_prop; AQQueueTable q_table; AQQueue queue; AQAgent subs1, subs2; /* Creating a AQQueueTable property object (payload type - RAW): */ qtable_prop = new AQQueueTableProperty("RAW"); System.out.println("Successful setCompatible"); /* Set multiconsumer flag to true: */ qtable_prop.setMultiConsumer(true); /* Creating a queue table called aq_table4 in aqjava schema: */ q_table = aq_sess.createQueueTable ("aqjava", "aq_table4", qtable_prop); System.out.println("Successful createQueueTable"); /* Creating a new AQQueueProperty object: */ queue_prop = new AQQueueProperty(); /* Creating a queue called aq_queue4 in aq_table4 */ queue = aq_sess.createQueue (q_table, "aq_queue4", queue_prop); System.out.println("Successful createQueue"); /* Enable enqueue/dequeue on this queue: */ queue.start(); System.out.println("Successful start queue"); /* Add subscribers to this queue: */ subs1 = new AQAgent("GREEN", null, 0); subs2 = new AQAgent("BLUE", null, 0); queue.addSubscriber(subs1, null); /* no rule */ System.out.println("Successful addSubscriber 1"); queue.addSubscriber(subs2, "priority < 2"); /* with rule */ System.out.println("Successful addSubscriber 2"); }
You must set up data structures similar to the following for certain examples to work:
$ cat >> message.typ case=lower type aq.message_typ $ $ ott userid=aq/aq intyp=message.typ outtyp=message_o.typ \ code=c hfile=demo.h $ $ proc intyp=message_o.typ iname=program name \ config=config file SQLCHECK=SEMANTICS userid=aq/aq
The following examples illustrate how to enqueue and dequeue Oracle Streams AQ messages:
Enqueuing and Dequeuing Object Type Messages Using Pro*C/C++
Enqueuing and Dequeuing Object Type Messages (CustomDatum interface) Using Java
Enqueuing and Dequeuing Object Type Messages (using SQLData interface) Using Java
Enqueuing and Dequeuing Messages with Time Delay and Expiration Using PL/SQL
Enqueuing and Dequeuing Messages by Correlation and Message ID Using Pro*C/C++
Enqueuing and Dequeuing Messages by Correlation and Message ID Using OCI
Enqueuing and Dequeuing Messages to/from a Multiconsumer Queue Using PL/SQL
Enqueuing and Dequeuing Messages to/from a Multiconsumer Queue using OCI
Enqueuing and Dequeuing Messages Using Message Grouping Using PL/SQL
Enqueuing and Dequeuing Object Type Messages That Contain LOB Attributes Using PL/SQL
Enqueuing and Dequeuing Object Type Messages That Contain LOB Attributes Using Java
Example 2-11 Enqueuing and Dequeuing Object Type Messages Using PL/SQL
To enqueue a single message without any other parameters specify the queue name and the payload.
/* Enqueue to msg_queue: */ DECLARE enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message aq.message_typ; BEGIN message := message_typ('NORMAL MESSAGE', 'enqueued to msg_queue first.'); dbms_aq.enqueue(queue_name => 'msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); COMMIT; END; /* Dequeue from msg_queue: */ DECLARE dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message aq.message_typ; BEGIN DBMS_AQ.DEQUEUE(queue_name => 'msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); DBMS_OUTPUT.PUT_LINE ('Message: ' || message.subject || ' ... ' || message.text ); COMMIT; END;
Example 2-12 Enqueuing and Dequeuing Object Type Messages Using Pro*C/C++
#include <stdio.h> #include <string.h> #include <sqlca.h> #include <sql2oci.h> /* The header file generated by processing object type 'aq.Message_typ': */ #include "pceg.h" void sql_error(msg) char *msg; { EXEC SQL WHENEVER SQLERROR CONTINUE; printf("%s\n", msg); printf("\n% .800s \n", sqlca.sqlerrm.sqlerrmc); EXEC SQL ROLLBACK WORK RELEASE; exit(1); } main() { Message_typ *message = (Message_typ*)0; /* payload */ message_type_ind *imsg; /*payload indicator*/ char user[60]="aq/AQ"; /* user logon password */ char subject[30]; /* components of the */ char txt[80]; /* payload type */ /* ENQUEUE and DEQUEUE to an OBJECT QUEUE */ /* Connect to database: */ EXEC SQL CONNECT :user; /* On an oracle error print the error number :*/ EXEC SQL WHENEVER SQLERROR DO sql_error("Oracle Error :"); /* Allocate memory for the host variable from the object cache : */ EXEC SQL ALLOCATE :message; /* ENQUEUE */ strcpy(subject, "NORMAL ENQUEUE"); strcpy(txt, "The Enqueue was done through PLSQL embedded in PROC"); /* Initialize the components of message : */ EXEC SQL OBJECT SET subject, text OF :message TO :subject, :txt; /* Embedded PLSQL call to the AQ enqueue procedure : */ EXEC SQL EXECUTE DECLARE message_properties dbms_aq.message_properties_t; enqueue_options dbms_aq.enqueue_options_t; msgid RAW(16); BEGIN /* Bind the host variable 'message' to the payload: */ dbms_aq.enqueue(queue_name => 'msg_queue', message_properties => message_properties, enqueue_options => enqueue_options, payload => :message:imsg, /* indicator must be specified */ msgid => msgid); END; END-EXEC; /* Commit work */ EXEC SQL COMMIT; printf("Enqueued Message \n"); printf("Subject :%s\n",subject); printf("Text :%s\n",txt); /* Dequeue */ /* Embedded PLSQL call to the AQ dequeue procedure : */ EXEC SQL EXECUTE DECLARE message_properties dbms_aq.message_properties_t; dequeue_options dbms_aq.dequeue_options_t; msgid RAW(16); BEGIN /* Return the payload into the host variable 'message': */ dbms_aq.dequeue(queue_name => 'msg_queue', message_properties => message_properties, dequeue_options => dequeue_options, payload => :message, msgid => msgid); END; END-EXEC; /* Commit work :*/ EXEC SQL COMMIT; /* Extract the components of message: */ EXEC SQL OBJECT GET SUBJECT,TEXT FROM :message INTO :subject,:txt; printf("Dequeued Message \n"); printf("Subject :%s\n",subject); printf("Text :%s\n",txt); }
Example 2-13 Enqueuing and Dequeuing Object Type Messages Using OCI
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <oci.h> struct message { OCIString *subject; OCIString *data; }; typedef struct message message; struct null_message { OCIInd null_adt; OCIInd null_subject; OCIInd null_data; }; typedef struct null_message null_message; int main() { OCIEnv *envhp; OCIServer *srvhp; OCIError *errhp; OCISvcCtx *svchp; dvoid *tmp; OCIType *mesg_tdo = (OCIType *) 0; message msg; null_message nmsg; message *mesg = &msg; null_message *nmesg = &nmsg; message *deqmesg = (message *)0; null_message *ndeqmesg = (null_message *)0; OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0, (dvoid * (*)()) 0, (void (*)()) 0 ); OCIHandleAlloc((dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 52, (dvoid **) &tmp); OCIEnvInit(&envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp ); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 52, (dvoid **) &tmp); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 52, (dvoid **) &tmp); OCIServerAttach(srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 52, (dvoid **) &tmp); OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); OCILogon(envhp, errhp, &svchp, "AQ", strlen("AQ"), "AQ", strlen("AQ"), 0, 0); /* Obtain TDO of message_typ */ OCITypeByName(envhp, errhp, svchp, (CONST text *)"AQ", strlen("AQ"), (CONST text *)"MESSAGE_TYP", strlen("MESSAGE_TYP"), (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo); /* Prepare the message payload */ mesg->subject = (OCIString *)0; mesg->data = (OCIString *)0; OCIStringAssignText(envhp, errhp, (CONST text *)"NORMAL MESSAGE", strlen("NORMAL MESSAGE"), &mesg->subject); OCIStringAssignText(envhp, errhp, (CONST text *)"OCI ENQUEUE", strlen("OCI ENQUEUE"), &mesg->data); nmesg->null_adt = nmesg->null_subject = nmesg->null_data = OCI_IND_NOTNULL; /* Enqueue into the msg_queue */ OCIAQEnq(svchp, errhp, (CONST text *)"msg_queue", 0, 0, mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, 0, 0); OCITransCommit(svchp, errhp, (ub4) 0); /* Dequeue from the msg_queue */ OCIAQDeq(svchp, errhp, (CONST text *)"msg_queue", 0, 0, mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, 0, 0); printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject)); printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data)); OCITransCommit(svchp, errhp, (ub4) 0); }
Example 2-14 Enqueuing and Dequeuing Object Type Messages (CustomDatum interface) Using Java
To enqueue and dequeue object type messages follow the lettered steps:
a. Create the SQL type for the Queue Payload
connect aquser/aquser create type ADDRESS as object (street VARCHAR (30), city VARCHAR(30)); create type PERSON as object (name VARCHAR (30), home ADDRESS);
b. Generate the java class that maps to the PERSON ADT and implements the CustomDatum interface (using Jpublisher tool)
jpub -user=aquser/aquser -sql=ADDRESS,PERSON -case=mixed -usertypes=oracle -methods=false -compatible=CustomDatum
This creates two classes, PERSON.java
and ADDRESS.java
, corresponding to the PERSON
and ADDRESS
ADT types.
c. Create the queue table and queue with ADT payload
d. Enqueue and dequeue messages containing object payloads
public static void AQObjectPayloadTest(AQSession aq_sess) throws AQException, SQLException, ClassNotFoundException { Connection db_conn = null; AQQueue queue = null; AQMessage message = null; AQObjectPayload payload = null; AQEnqueueOption eq_option = null; AQDequeueOption dq_option = null; PERSON pers = null; PERSON pers2= null; ADDRESS addr = null; db_conn = ((AQOracleSession)aq_sess).getDBConnection(); queue = aq_sess.getQueue("aquser", "test_queue2"); /* Enable enqueue/dequeue on this queue */ queue.start(); /* Enqueue a message in test_queue2 */ message = queue.createMessage(); pers = new PERSON(); pers.setName("John"); addr = new ADDRESS(); addr.setStreet("500 Easy Street"); addr.setCity("San Francisco"); pers.setHome(addr); payload = message.getObjectPayload(); payload.setPayloadData(pers); eq_option = new AQEnqueueOption(); /* Enqueue a message into test_queue2 */ queue.enqueue(eq_option, message); db_conn.commit(); /* Dequeue a message from test_queue2 */ dq_option = new AQDequeueOption(); message = ((AQOracleQueue)queue).dequeue(dq_option, PERSON.getFactory()); payload = message.getObjectPayload(); pers2 = (PERSON) payload.getPayloadData(); System.out.println("Object data retrieved: [PERSON]"); System.out.println("Name: " + pers2.getName()); System.out.println("Address "); System.out.println("Street: " + pers2.getHome().getStreet()); System.out.println("City: " + pers2.getHome().getCity()); db_conn.commit(); }
Example 2-15 Enqueuing and Dequeuing Object Type Messages (using SQLData interface) Using Java
To enqueue and dequeue object type messages follow the lettered steps:
a. Create the SQL type for the Queue Payload.
connect aquser/aquser create type EMPLOYEE as object (empname VARCHAR (50), empno INTEGER);
b. Create a java class that maps to the EMPLOYEE ADT and implements the SQLData interface. This class can also be generated using JPublisher using the following syntax.
jpub -user=aquser/aquser -sql=EMPLOYEE -case=mixed -usertypes=jdbc -methods=false import java.sql.*; import oracle.jdbc2.*; public class Employee implements SQLData { private String sql_type; public String empName; public int empNo; public Employee() {} public Employee (String sql_type, String empName, int empNo) { this.sql_type = sql_type; this.empName = empName; this.empNo = empNo; } ////// implements SQLData ////// public String getSQLTypeName() throws SQLException { return sql_type; } public void readSQL(SQLInput stream, String typeName) throws SQLException { sql_type = typeName; empName = stream.readString(); empNo = stream.readInt(); } public void writeSQL(SQLOutput stream) throws SQLException { stream.writeString(empName); stream.writeInt(empNo); } public String toString() { String ret_str = ""; ret_str += "[Employee]\n"; ret_str += "Name: " + empName + "\n"; ret_str += "Number: " + empNo + "\n"; return ret_str; } }
c. Create the queue table and queue with ADT payload.
public static void createEmployeeObjQueue(AQSession aq_sess) throws AQException { AQQueueTableProperty qt_prop = null; AQQueueProperty q_prop = null; AQQueueTable q_table = null; AQQueue queue = null; /* Message payload type is aquser.EMPLOYEE */ qt_prop = new AQQueueTableProperty("AQUSER.EMPLOYEE"); qt_prop.setComment("queue-table1"); /* Creating aQTable1 */ System.out.println("\nCreate QueueTable: [aqtable1]"); q_table = aq_sess.createQueueTable("aquser", "aqtable1", qt_prop); /* Create test_queue1 */ q_prop = new AQQueueProperty(); queue = q_table.createQueue("test_queue1", q_prop); /* Enable enqueue/dequeue on this queue */ queue.start(); }
d. Enqueue and dequeue messages containing object payloads.
public static void AQObjectPayloadTest2(AQSession aq_sess) throws AQException, SQLException, ClassNotFoundException { Connection db_conn = null; AQQueue queue = null; AQMessage message = null; AQObjectPayload payload = null; AQEnqueueOption eq_option = null; AQDequeueOption dq_option = null; Employee emp = null; Employee emp2 = null; Hashtable map; db_conn = ((AQOracleSession)aq_sess).getDBConnection(); /* Get the Queue object */ queue = aq_sess.getQueue("aquser", "test_queue1"); /* Register Employee class (corresponding to EMPLOYEE Adt) * in the connection type map */ try { map = (java.util.Hashtable)(((OracleConnection)db_conn).getTypeMap()); map.put("AQUSER.EMPLOYEE", Class.forName("Employee")); } catch(Exception ex) { System.out.println("Error registering type: " + ex); } /* Enqueue a message in test_queue1 */ message = queue.createMessage(); emp = new Employee("AQUSER.EMPLOYEE", "Mark", 1007); /* Set the object payload */ payload = message.getObjectPayload(); payload.setPayloadData(emp); /* Enqueue a message into test_queue1*/ eq_option = new AQEnqueueOption(); queue.enqueue(eq_option, message); db_conn.commit(); /* Dequeue a message from test_queue1 */ dq_option = new AQDequeueOption(); message = queue.dequeue(dq_option, Class.forName("Employee")); payload = message.getObjectPayload(); emp2 = (Employee) payload.getPayloadData(); System.out.println("\nObject data retrieved: [EMPLOYEE]"); System.out.println("Name : " + emp2.empName); System.out.println("EmpId : " + emp2.empNo); db_conn.commit(); }
Example 2-16 Enqueuing and Dequeuing RAW Type Messages Using PL/SQL
DECLARE enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message RAW(4096); BEGIN message := HEXTORAW(RPAD('FF',4095,'FF')); DBMS_AQ.ENQUEUE(queue_name => 'raw_msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); COMMIT; END; /* Dequeue from raw_msg_queue: */ /* Dequeue from raw_msg_queue: */ DECLARE dequeue_options DBMS_AQ.dequeue_options_t; message_properties DBMS_AQ.message_properties_t; message_handle RAW(16); message RAW(4096); BEGIN DBMS_AQ.DEQUEUE(queue_name => 'raw_msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); COMMIT; END;
You must set up data structures similar to the following for certain examples to work:
$ cat >> message.typ case=lower type aq.message_typ $ $ ott userid=aq/aq intyp=message.typ outtyp=message_o.typ \ code=c hfile=demo.h $ $ proc intyp=message_o.typ iname=program name \ config=config file SQLCHECK=SEMANTICS userid=aq/aq
Example 2-17 Enqueuing and Dequeuing RAW Type Messages Using Pro*C/C++
#include <stdio.h> #include <string.h> #include <sqlca.h> #include <sql2oci.h> void sql_error(msg) char *msg; { EXEC SQL WHENEVER SQLERROR CONTINUE; printf("%s\n", msg); printf("\n% .800s \n", sqlca.sqlerrm.sqlerrmc); EXEC SQL ROLLBACK WORK RELEASE; exit(1); } main() { OCIEnv *oeh; /* OCI Env handle */ OCIError *err; /* OCI Err handle */ OCIRaw *message= (OCIRaw*)0; /* payload */ ub1 message_txt[100]; /* data for payload */ char user[60]="aq/AQ"; /* user logon password */ int status; /* returns status of the OCI call */ /* Enqueue and dequeue to a RAW queue */ /* Connect to database: */ EXEC SQL CONNECT :user; /* On an oracle error print the error number: */ EXEC SQL WHENEVER SQLERROR DO sql_error("Oracle Error :"); /* Get the OCI Env handle: */ if (SQLEnvGet(SQL_SINGLE_RCTX, &oeh) != OCI_SUCCESS) { printf(" error in SQLEnvGet \n"); exit(1); } /* Get the OCI Error handle: */ if (status = OCIHandleAlloc((dvoid *)oeh, (dvoid **)&err, (ub4)OCI_HTYPE_ERROR, (ub4)0, (dvoid **)0)) { printf(" error in OCIHandleAlloc %d \n", status); exit(1); } /* Enqueue */ /* The bytes to be put into the raw payload:*/ strcpy(message_txt, "Enqueue to a Raw payload queue "); /* Assign bytes to the OCIRaw pointer : Memory must be allocated explicitly to OCIRaw*: */ if (status=OCIRawAssignBytes(oeh, err, message_txt, 100, &message)) { printf(" error in OCIRawAssignBytes %d \n", status); exit(1); } /* Embedded PLSQL call to the AQ enqueue procedure : */ EXEC SQL EXECUTE DECLARE message_properties dbms_aq.message_properties_t; enqueue_options dbms_aq.enqueue_options_t; msgid RAW(16); BEGIN /* Bind the host variable message to the raw payload: */ dbms_aq.enqueue(queue_name => 'raw_msg_queue', message_properties => message_properties, enqueue_options => enqueue_options, payload => :message, msgid => msgid); END; END-EXEC; /* Commit work: */ EXEC SQL COMMIT; /* Dequeue */ /* Embedded PLSQL call to the AQ dequeue procedure :*/ EXEC SQL EXECUTE DECLARE message_properties dbms_aq.message_properties_t; dequeue_options dbms_aq.dequeue_options_t; msgid RAW(16); BEGIN /* Return the raw payload into the host variable 'message':*/ dbms_aq.dequeue(queue_name => 'raw_msg_queue', message_properties => message_properties, dequeue_options => dequeue_options, payload => :message, msgid => msgid); END; END-EXEC; /* Commit work: */ EXEC SQL COMMIT; }
Example 2-18 Enqueuing and Dequeuing RAW Type Messages Using OCI
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <oci.h> int main() { OCIEnv *envhp; OCIServer *srvhp; OCIError *errhp; OCISvcCtx *svchp; dvoid *tmp; OCIType *mesg_tdo = (OCIType *) 0; char msg_text[100]; OCIRaw *mesg = (OCIRaw *)0; OCIRaw *deqmesg = (OCIRaw *)0; OCIInd ind = 0; dvoid *indptr = (dvoid *)&ind; int i; OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0, (dvoid * (*)()) 0, (void (*)()) 0 ); OCIHandleAlloc((dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 52, (dvoid **) &tmp); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp ); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 52, (dvoid **) &tmp); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 52, (dvoid **) &tmp); OCIServerAttach(srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 52, (dvoid **) &tmp); OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); OCILogon(envhp, errhp, &svchp, "AQ", strlen("AQ"), "AQ", strlen("AQ"), 0, 0); /* Obtain the TDO of the RAW data type */ OCITypeByName(envhp, errhp, svchp, (CONST text *)"AQADM", strlen("AQADM"), (CONST text *)"RAW", strlen("RAW"), (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo); /* Prepare the message payload */ strcpy(msg_text, "Enqueue to a RAW queue"); OCIRawAssignBytes(envhp, errhp, msg_text, strlen(msg_text), &mesg); /* Enqueue the message into raw_msg_queue */ OCIAQEnq(svchp, errhp, (CONST text *)"raw_msg_queue", 0, 0, mesg_tdo, (dvoid **)&mesg, (dvoid **)&indptr, 0, 0); OCITransCommit(svchp, errhp, (ub4) 0); /* Dequeue the same message into C variable deqmesg */ OCIAQDeq(svchp, errhp, (CONST text *)"raw_msg_queue", 0, 0, mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&indptr, 0, 0); for (i = 0; i < OCIRawSize(envhp, deqmesg); i++) printf("%c", *(OCIRawPtr(envhp, deqmesg) + i)); OCITransCommit(svchp, errhp, (ub4) 0); }
Example 2-19 Enqueuing RAW Messages Using Java
public static void runTest(AQSession aq_sess) throws AQException { AQQueueTable q_table; AQQueue queue; AQMessage message; AQRawPayload raw_payload; AQEnqueueOption enq_option; String test_data = "new message"; byte[] b_array; Connection db_conn; db_conn = ((AQOracleSession)aq_sess).getDBConnection(); /* Get a handle to queue table - aq_table4 in aqjava schema: */ q_table = aq_sess.getQueueTable ("aqjava", "aq_table4"); System.out.println("Successful getQueueTable"); /* Get a handle to a queue - aq_queue4 in aquser schema: */ queue = aq_sess.getQueue ("aqjava", "aq_queue4"); System.out.println("Successful getQueue"); /* Creating a message to contain raw payload: */ message = queue.createMessage(); /* Get handle to the AQRawPayload object and populate it with raw data: */ b_array = test_data.getBytes(); raw_payload = message.getRawPayload(); raw_payload.setStream(b_array, b_array.length); /* Creating a AQEnqueueOption object with default options: */ enq_option = new AQEnqueueOption(); /* Enqueue the message: */ queue.enqueue(enq_option, message); db_conn.commit(); }
Example 2-20 Dequeuing Messages Using Java
public static void runTest(AQSession aq_sess) throws AQException { AQQueueTable q_table; AQQueue queue; AQMessage message; AQRawPayload raw_payload; AQEnqueueOption enq_option; String test_data = "new message"; AQDequeueOption deq_option; byte[] b_array; Connection db_conn; db_conn = ((AQOracleSession)aq_sess).getDBConnection(); /* Get a handle to queue table - aq_table4 in aqjava schema: */ q_table = aq_sess.getQueueTable ("aqjava", "aq_table4"); System.out.println("Successful getQueueTable"); /* Get a handle to a queue - aq_queue4 in aquser schema: */ queue = aq_sess.getQueue ("aqjava", "aq_queue4"); System.out.println("Successful getQueue"); /* Creating a message to contain raw payload: */ message = queue.createMessage(); /* Get handle to the AQRawPayload object and populate it with raw data: */ b_array = test_data.getBytes(); raw_payload = message.getRawPayload(); raw_payload.setStream(b_array, b_array.length); /* Creating a AQEnqueueOption object with default options: */ enq_option = new AQEnqueueOption(); /* Enqueue the message: */ queue.enqueue(enq_option, message); System.out.println("Successful enqueue"); db_conn.commit(); /* Creating a AQDequeueOption object with default options: */ deq_option = new AQDequeueOption(); /* Dequeue a message: */ message = queue.dequeue(deq_option); System.out.println("Successful dequeue"); /* Retrieve raw data from the message: */ raw_payload = message.getRawPayload(); b_array = raw_payload.getBytes(); db_conn.commit(); }
Example 2-21 Dequeuing Messages in Browse Mode Using Java
public static void runTest(AQSession aq_sess) throws AQException { AQQueueTable q_table; AQQueueTable q_table; AQQueue queue; AQMessage message; AQRawPayload raw_payload; AQEnqueueOption enq_option; String test_data = "new message"; AQDequeueOption deq_option; byte[] b_array; Connection db_conn; db_conn = ((AQOracleSession)aq_sess).getDBConnection(); /* Get a handle to queue table - aq_table4 in aqjava schema: */ q_table = aq_sess.getQueueTable ("aqjava", "aq_table4"); System.out.println("Successful getQueueTable"); /* Get a handle to a queue - aq_queue4 in aquser schema: */ queue = aq_sess.getQueue ("aqjava", "aq_queue4"); System.out.println("Successful getQueue"); /* Creating a message to contain raw payload: */ message = queue.createMessage(); /* Get handle to the AQRawPayload object and populate it with raw data: */ b_array = test_data.getBytes(); raw_payload = message.getRawPayload(); raw_payload.setStream(b_array, b_array.length); /* Creating a AQEnqueueOption object with default options: */ enq_option = new AQEnqueueOption(); /* Enqueue the message: */ queue.enqueue(enq_option, message); System.out.println("Successful enqueue"); db_conn.commit(); /* Creating a AQDequeueOption object with default options: */ deq_option = new AQDequeueOption(); /* Set dequeue mode to BROWSE: */ deq_option.setDequeueMode(AQDequeueOption.DEQUEUE_BROWSE); /* Set wait time to 10 seconds: */ deq_option.setWaitTime(10); /* Dequeue a message: */ message = queue.dequeue(deq_option); /* Retrieve raw data from the message: */ raw_payload = message.getRawPayload(); b_array = raw_payload.getBytes(); String ret_value = new String(b_array); System.out.println("Dequeued message: " + ret_value); db_conn.commit(); }
Example 2-22 Enqueuing and Dequeuing Messages by Priority Using PL/SQL
When two messages are enqueued with the same priority, the message which was enqueued earlier is dequeued first. However, if two messages are of different priorities, then the message with the lower value (higher priority) is dequeued first.
/* Enqueue two messages with priority 30 and 5: */ DECLARE enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message aq.message_typ; BEGIN message := message_typ('PRIORITY MESSAGE', 'enqued at priority 30.'); message_properties.priority := 30; DBMS_AQ.ENQUEUE(queue_name => 'priority_msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); message := message_typ('PRIORITY MESSAGE', 'Enqueued at priority 5.'); message_properties.priority := 5; DBMS_AQ.ENQUEUE(queue_name => 'priority_msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); END; /* Dequeue from priority queue: */ DECLARE dequeue_options DBMS_AQ.dequeue_options_t; message_properties DBMS_AQ.message_properties_t; message_handle RAW(16); message aq.message_typ; BEGIN DBMS_AQ.DEQUEUE(queue_name => 'priority_msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); DBMS_OUTPUT.PUT_LINE ('Message: ' || message.subject || ' ... ' || message.text ); COMMIT; DBMS_AQ.DEQUEUE(queue_name => 'priority_msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); DBMS_OUTPUT.PUT_LINE ('Message: ' || message.subject || ' ... ' || message.text ); COMMIT; END; /* On return, the second message with priority set to 5 is retrieved before the message with priority set to 30 because priority takes precedence over enqueue time. */
Example 2-23 Enqueuing Messages with Priority Using Java
public static void runTest(AQSession aq_sess) throws AQException { AQQueueTable q_table; AQQueue queue; AQMessage message; AQMessageProperty m_property; AQRawPayload raw_payload; AQEnqueueOption enq_option; String test_data; byte[] b_array; Connection db_conn; db_conn = ((AQOracleSession)aq_sess).getDBConnection(); /* Get a handle to queue table - aq_table4 in aqjava schema: */ qtable = aq_sess.getQueueTable ("aqjava", "aq_table4"); System.out.println("Successful getQueueTable"); /* Get a handle to a queue - aq_queue4 in aqjava schema: */ queue = aq_sess.getQueue ("aqjava", "aq_queue4"); System.out.println("Successful getQueue"); /* Enqueue 5 messages with priorities with different priorities: */ for (int i = 0; i < 5; i++ ) { /* Creating a message to contain raw payload: */ message = queue.createMessage(); test_data = "Small_message_" + (i+1); /* some test data */ /* Get a handle to the AQRawPayload object and populate it with raw data: */ b_array = test_data.getBytes(); raw_payload = message.getRawPayload(); raw_payload.setStream(b_array, b_array.length); /* Set message priority: */ m_property = message.getMessageProperty(); if( i < 2) m_property.setPriority(2); else m_property.setPriority(3); /* Creating a AQEnqueueOption object with default options: */ enq_option = new AQEnqueueOption(); /* Enqueue the message: */ queue.enqueue(enq_option, message); System.out.println("Successful enqueue"); } db_conn.commit(); }
Example 2-24 Dequeuing Messages after Preview by Criterion Using PL/SQL
An application can preview messages in browse mode or locked mode without deleting the message. The message of interest can then be removed from the queue.
/* Enqueue 6 messages to msg_queue — GREEN, GREEN, YELLOW, VIOLET, BLUE, RED */ DECLARE enqueue_options DBMS_AQ.enqueue_options_t; message_properties DBMS_AQ.message_properties_t; message_handle RAW(16); message aq.message_typ; BEGIN message := message_typ('GREEN', 'GREEN enqueued to msg_queue first.'); DBMS_AQ.ENQUEUE(queue_name => 'msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); message := message_typ('GREEN', 'GREEN also enqueued to msg_queue second.'); DBMS_AQ.ENQUEUE(queue_name => 'msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); message := message_typ('YELLOW', 'YELLOW enqueued to msg_queue third.'); DBMS_AQ.ENQUEUE(queue_name => 'msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); DBMS_OUTPUT.PUT_LINE ('Message handle: ' || message_handle); message := message_typ('VIOLET', 'VIOLET enqueued to msg_queue fourth.'); DBMS_AQ.ENQUEUE(queue_name => 'msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); message := message_typ('BLUE', 'BLUE enqueued to msg_queue fifth.'); DBMS_AQ.ENQUEUE(queue_name => 'msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); message := message_typ('RED', 'RED enqueued to msg_queue sixth.'); DBMS_AQ.ENQUEUE(queue_name => 'msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); COMMIT; END; /* Dequeue in BROWSE mode until RED is found, and remove RED from queue: */ DECLARE dequeue_options DBMS_AQ.dequeue_options_t; message_properties DBMS_AQ.message_properties_t; message_handle RAW(16); message aq.message_typ; BEGIN dequeue_options.dequeue_mode := DBMS_AQ.BROWSE; LOOP DBMS_AQ.DEQUEUE(queue_name => 'msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); DBMS_OUTPUT.PUT_LINE ('Message: ' || message.subject || ' ... ' || message.text ); EXIT WHEN message.subject = 'RED'; END LOOP; dequeue_options.dequeue_mode := DBMS_AQ.REMOVE; dequeue_options.msgid := message_handle; DBMS_AQ.DEQUEUE(queue_name => 'msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); DBMS_OUTPUT.PUT_LINE ('Message: ' || message.subject || ' ... ' || message.text ); COMMIT; END; /* Dequeue in LOCKED mode until BLUE is found, and remove BLUE from queue: */ DECLARE dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message aq.message_typ; BEGIN dequeue_options.dequeue_mode := dbms_aq.LOCKED; LOOP dbms_aq.dequeue(queue_name => 'msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); dbms_output.put_line ('Message: ' || message.subject || ' ... ' || message.text ); EXIT WHEN message.subject = 'BLUE'; END LOOP; dequeue_options.dequeue_mode := dbms_aq.REMOVE; dequeue_options.msgid := message_handle; dbms_aq.dequeue(queue_name => 'msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); DBMS_OUTPUT.PUT_LINE ('Message: ' || message.subject || ' ... ' || message.text ); COMMIT; END;
Expiration is calculated from the earliest dequeue time. So, if an application wants a message to be dequeued no earlier than a week from now, but no later than 3 weeks from now, then this requires setting the expiration time for 2 weeks. This scenario is described in the following code segment.
Example 2-25 Enqueuing and Dequeuing Messages with Time Delay and Expiration Using PL/SQL
/* Enqueue message for delayed availability: */ DECLARE enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message aq.Message_typ; BEGIN message := Message_typ('DELAYED', 'This message is delayed one week.'); message_properties.delay := 7*24*60*60; message_properties.expiration := 2*7*24*60*60; dbms_aq.enqueue(queue_name => 'msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); COMMIT; END;
You must set up data structures similar to the following for certain examples to work:
$ cat >> message.typ case=lower type aq.message_typ $ $ ott userid=aq/aq intyp=message.typ outtyp=message_o.typ \ code=c hfile=demo.h $ $ proc intyp=message_o.typ iname=program name \ config=config file SQLCHECK=SEMANTICS userid=aq/aq
Example 2-26 Enqueuing and Dequeuing Messages by Correlation and Message ID Using Pro*C/C++
#include <stdio.h> #include <string.h> #include <sqlca.h> #include <sql2oci.h> /* The header file generated by processing object type 'aq.Message_typ': */ #include "pceg.h" void sql_error(msg) char *msg; { EXEC SQL WHENEVER SQLERROR CONTINUE; printf("%s\n", msg); printf("\n% .800s \n", sqlca.sqlerrm.sqlerrmc); EXEC SQL ROLLBACK WORK RELEASE; exit(1); } main() { OCIEnv *oeh; /* OCI Env Handle */ OCIError *err; /* OCI Error Handle */ Message_typ *message = (Message_typ*)0; /* queue payload */ message_type_ind *imsg; /*payload indicator*/ OCIRaw *msgid = (OCIRaw*)0; /* message id */ ub1 msgmem[16]=""; /* memory for msgid */ char user[60]="aq/AQ"; /* user login password */ char subject[30]; /* components of */ char txt[80]; /* Message_typ */ char correlation1[30]; /* message correlation */ char correlation2[30]; int status; /* code returned by the OCI calls */ /* Dequeue by correlation and msgid */ /* Connect to the database: */ EXEC SQL CONNECT :user; EXEC SQL WHENEVER SQLERROR DO sql_error("Oracle Error :"); /* Allocate space in the object cache for the host variable: */ EXEC SQL ALLOCATE :message; /* Get the OCI Env handle: */ if (SQLEnvGet(SQL_SINGLE_RCTX, &oeh) != OCI_SUCCESS) { printf(" error in SQLEnvGet \n"); exit(1); } /* Get the OCI Error handle: */ if (status = OCIHandleAlloc((dvoid *)oeh, (dvoid **)&err, (ub4)OCI_HTYPE_ERROR, (ub4)0, (dvoid **)0)) { printf(" error in OCIHandleAlloc %d \n", status); exit(1); } /* Assign memory for msgid: Memory must be allocated explicitly to OCIRaw*: */ if (status=OCIRawAssignBytes(oeh, err, msgmem, 16, &msgid)) { printf(" error in OCIRawAssignBytes %d \n", status); exit(1); } /* First enqueue */ strcpy(correlation1, "1st message"); strcpy(subject, "NORMAL ENQUEUE1"); strcpy(txt, "The Enqueue was done through PLSQL embedded in PROC"); /* Initialize the components of message: */ EXEC SQL OJECT SET subject, text OF :message TO :subject, :txt; /* Embedded PLSQL call to the AQ enqueue procedure: */ EXEC SQL EXECUTE DECLARE message_properties dbms_aq.message_properties_t; enqueue_options dbms_aq.enqueue_options_t; BEGIN /* Bind the host variable 'correlation1': to message correlation*/ message_properties.correlation := :correlation1; /* Bind the host variable 'message' to payload and return message ID into host variable 'msgid': */ dbms_aq.enqueue(queue_name => 'msg_queue', message_properties => message_properties, enqueue_options => enqueue_options, payload => :message:imsg, /* indicator must be specified */ msgid => :msgid); END; END-EXEC; /* Commit work: */ EXEC SQL COMMIT; printf("Enqueued Message \n"); printf("Subject :%s\n",subject); printf("Text :%s\n",txt); /* Second enqueue */ strcpy(correlation2, "2nd message"); strcpy(subject, "NORMAL ENQUEUE2"); strcpy(txt, "The Enqueue was done through PLSQL embedded in PROC"); /* Initialize the components of message: */ EXEC SQL OBJECT SET subject, text OF :messsage TO :subject,:txt; /* Embedded PLSQL call to the AQ enqueue procedure: */ EXEC SQL EXECUTE DECLARE message_properties dbms_aq.message_properties_t; enqueue_options dbms_aq.enqueue_options_t; msgid RAW(16); BEGIN /* Bind the host variable 'correlation2': to message correlaiton */ message_properties.correlation := :correlation2; /* Bind the host variable 'message': to payload */ dbms_aq.enqueue(queue_name => 'msg_queue', message_properties => message_properties, enqueue_options => enqueue_options, payload => :message, msgid => msgid); END; END-EXEC; /* Commit work: */ EXEC SQL COMMIT; printf("Enqueued Message \n"); printf("Subject :%s\n",subject); printf("Text :%s\n",txt); /* First dequeue - by correlation */ EXEC SQL EXECUTE DECLARE message_properties dbms_aq.message_properties_t; dequeue_options dbms_aq.dequeue_options_t; msgid RAW(16); BEGIN /* Dequeue by correlation in host variable 'correlation2': */ dequeue_options.correlation := :correlation2; /* Return the payload into host variable 'message': */ dbms_aq.dequeue(queue_name => 'msg_queue', message_properties => message_properties, dequeue_options => dequeue_options, payload => :message, msgid => msgid); END; END-EXEC; /* Commit work : */ EXEC SQL COMMIT; /* Extract the values of the components of message: */ EXEC SQL OBJECT GET subject, text FROM :message INTO :subject,:txt; printf("Dequeued Message \n"); printf("Subject :%s\n",subject); printf("Text :%s\n",txt); /* SECOND DEQUEUE - by MSGID */ EXEC SQL EXECUTE DECLARE message_properties dbms_aq.message_properties_t; dequeue_options dbms_aq.dequeue_options_t; msgid RAW(16); BEGIN /* Dequeue by msgid in host variable 'msgid': */ dequeue_options.msgid := :msgid; /* Return the payload into host variable 'message': */ dbms_aq.dequeue(queue_name => 'msg_queue', message_properties => message_properties, dequeue_options => dequeue_options, payload => :message, msgid => msgid); END; END-EXEC; /* Commit work: */ EXEC SQL COMMIT; }
Example 2-27 Enqueuing and Dequeuing Messages by Correlation and Message ID Using OCI
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <oci.h> struct message { OCIString *subject; OCIString *data; }; typedef struct message message; struct null_message { OCIInd null_adt; OCIInd null_subject; OCIInd null_data; }; typedef struct null_message null_message; int main() { OCIEnv *envhp; OCIServer *srvhp; OCIError *errhp; OCISvcCtx *svchp; dvoid *tmp; OCIType *mesg_tdo = (OCIType *) 0; message msg; null_message nmsg; message *mesg = &msg; null_message *nmesg = &nmsg; message *deqmesg = (message *)0; null_message *ndeqmesg = (null_message *)0; OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0, (dvoid * (*)()) 0, (void (*)()) 0 ); OCIHandleAlloc((dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 52, (dvoid **) &tmp); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp ); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 52, (dvoid **) &tmp); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 52, (dvoid **) &tmp); OCIServerAttach(srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 52, (dvoid **) &tmp); OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); OCILogon(envhp, errhp, &svchp, "AQ", strlen("AQ"), "AQ", strlen("AQ"), 0, 0); /* Obtain TDO of message_typ */ OCITypeByName(envhp, errhp, svchp, (CONST text *)"AQ", strlen("AQ"), (CONST text *)"MESSAGE_TYP", strlen("MESSAGE_TYP"), (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo); /* Prepare the message payload */ mesg->subject = (OCIString *)0; mesg->data = (OCIString *)0; OCIStringAssignText(envhp, errhp, (CONST text *)"NORMAL MESSAGE", strlen("NORMAL MESSAGE"), &mesg->subject); OCIStringAssignText(envhp, errhp, (CONST text *)"OCI ENQUEUE", strlen("OCI ENQUEUE"), &mesg->data); nmesg->null_adt = nmesg->null_subject = nmesg->null_data = OCI_IND_NOTNULL; /* Enqueue into the msg_queue */ OCIAQEnq(svchp, errhp, (CONST text *)"msg_queue", 0, 0, mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, 0, 0); OCITransCommit(svchp, errhp, (ub4) 0); /* Dequeue from the msg_queue */ OCIAQDeq(svchp, errhp, (CONST text *)"msg_queue", 0, 0, mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, 0, 0); printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject)); printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data)); OCITransCommit(svchp, errhp, (ub4) 0); }
Example 2-28 Enqueuing and Dequeuing Messages to/from a Multiconsumer Queue Using PL/SQL
/* Create subscriber list: */ DECLARE subscriber aq$_agent; /* Add subscribers RED and GREEN to the suscriber list: */ BEGIN subscriber := aq$_agent('RED', NULL, NULL); DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'msg_queue_multiple', subscriber => subscriber); subscriber := aq$_agent('GREEN', NULL, NULL); DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'msg_queue_multiple', subscriber => subscriber); END; DECLARE enqueue_options DBMS_AQ.enqueue_options_t; message_properties DBMS_AQ.message_properties_t; recipients DBMS_AQ.aq$_recipient_list_t; message_handle RAW(16); message aq.message_typ; /* Enqueue MESSAGE 1 for subscribers to the queue. BEGIN message := message_typ('MESSAGE 1', 'This message is queued for queue subscribers.'); DBMS_AQ.ENQUEUE(queue_name => 'msg_queue_multiple', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); /* Enqueue MESSAGE 2 for specified recipients.*/ message := message_typ('MESSAGE 2', 'This message is queued for two recipients.'); recipients(1) := aq$_agent('RED', NULL, NULL); recipients(2) := aq$_agent('BLUE', NULL, NULL); message_properties.recipient_list := recipients; DBMS_AQ.ENQUEUE(queue_name => 'msg_queue_multiple', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); COMMIT; END;
RED is both a subscriber to the queue, as well as being a specified recipient of MESSAGE 2. By contrast, GREEN is only a subscriber to those messages in the queue (in this case, MESSAGE) for which no recipients have been specified. BLUE, while not a subscriber to the queue, is nevertheless specified to receive MESSAGE 2.
/* Dequeue messages from msg_queue_multiple: */ DECLARE dequeue_options DBMS_AQ.dequeue_options_t; message_properties DBMS_AQ.message_properties_t; message_handle RAW(16); message aq.message_typ; no_messages exception; pragma exception_init (no_messages, -25228); BEGIN dequeue_options.wait := DBMS_AQ.NO_WAIT; BEGIN /* Consumer BLUE will get MESSAGE 2: */ dequeue_options.consumer_name := 'BLUE'; dequeue_options.navigation := FIRST_MESSAGE; LOOP DBMS_AQ.DEQUEUE(queue_name => 'msg_queue_multiple', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); DBMS_OUTPUT.PUT_LINE ('Message: ' || message.subject || ' ... ' || message.text ); dequeue_options.navigation := NEXT_MESSAGE; END LOOP; EXCEPTION WHEN no_messages THEN DBMS_OUTPUT.PUT_LINE ('No more messages for BLUE'); COMMIT; END; BEGIN /* Consumer RED will get MESSAGE 1 and MESSAGE 2: */ dequeue_options.consumer_name := 'RED'; dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE LOOP DBMS_AQ.DEQUEUE(queue_name => 'msg_queue_multiple', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); DBMS_OUTPUT.PUT_LINE ('Message: ' || message.subject || ' ... ' || message.text ); dequeue_options.navigation := NEXT_MESSAGE; END LOOP; EXCEPTION WHEN no_messages THEN DBMS_OUTPUT.PUT_LINE ('No more messages for RED'); COMMIT; END; BEGIN /* Consumer GREEN will get MESSAGE 1: */ dequeue_options.consumer_name := 'GREEN'; dequeue_options.navigation := FIRST_MESSAGE; LOOP DBMS_AQ.DEQUEUE(queue_name => 'msg_queue_multiple', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); DBMS_OUTPUT.PUT_LINE ('Message: ' || message.subject || ' ... ' || message.text ); dequeue_options.navigation := NEXT_MESSAGE; END LOOP; EXCEPTION WHEN no_messages THEN DBMS_OUTPUT.PUT_LINE ('No more messages for GREEN'); COMMIT; END;
You must set up the following data structures for certain examples to work:
CONNECT aqadm/aqadm EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE( queue_table => 'aq.qtable_multi', multiple_consumers => true, queue_payload_type => 'aq.message_typ'); EXECUTE DBMS_AQADM.START_QUEUE('aq.msg_queue_multiple'); CONNECT aq/aq
Example 2-29 Enqueuing and Dequeuing Messages to/from a Multiconsumer Queue using OCI
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <oci.h> struct message { OCIString *subject; OCIString *data; }; typedef struct message message; struct null_message { OCIInd null_adt; OCIInd null_subject; OCIInd null_data; }; typedef struct null_message null_message; int main() { OCIEnv *envhp; OCIServer *srvhp; OCIError *errhp; OCISvcCtx *svchp; dvoid *tmp; OCIType *mesg_tdo = (OCIType *) 0; message msg; null_message nmsg; message *mesg = &msg; null_message *nmesg = &nmsg; message *deqmesg = (message *)0; null_message *ndeqmesg = (null_message *)0; OCIAQMsgProperties *msgprop = (OCIAQMsgProperties *)0; OCIAQAgent *agents[2]; OCIAQDeqOptions *deqopt = (OCIAQDeqOptions *)0; ub4 wait = OCI_DEQ_NO_WAIT; ub4 navigation = OCI_DEQ_FIRST_MSG; OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0, (dvoid * (*)()) 0, (void (*)()) 0 ); OCIHandleAlloc((dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 52, (dvoid **) &tmp); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp ); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 52, (dvoid **) &tmp); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 52, (dvoid **) &tmp); OCIServerAttach(srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 52, (dvoid **) &tmp); OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); OCILogon(envhp, errhp, &svchp, "AQ", strlen("AQ"), "AQ", strlen("AQ"), 0, 0); /* Obtain TDO of message_typ */ OCITypeByName(envhp, errhp, svchp, (CONST text *)"AQ", strlen("AQ"), (CONST text *)"MESSAGE_TYP", strlen("MESSAGE_TYP"), (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo); /* Prepare the message payload */ mesg->subject = (OCIString *)0; mesg->data = (OCIString *)0; OCIStringAssignText(envhp, errhp, (CONST text *)"MESSAGE 1", strlen("MESSAGE 1"), &mesg->subject); OCIStringAssignText(envhp, errhp, (CONST text *)"mesg for queue subscribers", strlen("mesg for queue subscribers"), &mesg->data); nmesg->null_adt = nmesg->null_subject = nmesg->null_data = OCI_IND_NOTNULL; /* Enqueue MESSAGE 1 for subscribers to the queue. */ OCIAQEnq(svchp, errhp, (CONST text *)"msg_queue_multiple", 0, 0, mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, 0, 0); /* Enqueue MESSAGE 2 for specified recipients. */ /* prepare message payload */ OCIStringAssignText(envhp, errhp, (CONST text *)"MESSAGE 2", strlen("MESSAGE 2"), &mesg->subject); OCIStringAssignText(envhp, errhp, (CONST text *)"mesg for two recipients", strlen("mesg for two recipients"), &mesg->data); /* Allocate AQ message properties and agent descriptors */ OCIDescriptorAlloc(envhp, (dvoid **)&msgprop, OCI_DTYPE_AQMSG_PROPERTIES, 0, (dvoid **)0); OCIDescriptorAlloc(envhp, (dvoid **)&agents[0], OCI_DTYPE_AQAGENT, 0, (dvoid **)0); OCIDescriptorAlloc(envhp, (dvoid **)&agents[1], OCI_DTYPE_AQAGENT, 0, (dvoid **)0); /* Prepare the recipient list, RED and BLUE */ OCIAttrSet(agents[0], OCI_DTYPE_AQAGENT, "RED", strlen("RED"), OCI_ATTR_AGENT_NAME, errhp); OCIAttrSet(agents[1], OCI_DTYPE_AQAGENT, "BLUE", strlen("BLUE"), OCI_ATTR_AGENT_NAME, errhp); OCIAttrSet(msgprop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *)agents, 2, OCI_ATTR_RECIPIENT_LIST, errhp); OCIAQEnq(svchp, errhp, (CONST text *)"msg_queue_multiple", 0, msgprop, mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, 0, 0); OCITransCommit(svchp, errhp, (ub4) 0); /* Now dequeue the messages using different consumer names */ /* Allocate dequeue options descriptor to set the dequeue options */ OCIDescriptorAlloc(envhp, (dvoid **)&deqopt, OCI_DTYPE_AQDEQ_OPTIONS, 0, (dvoid **)0); /* Set wait parameter to NO_WAIT so that the dequeue returns immediately */ OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)&wait, 0, OCI_ATTR_WAIT, errhp); /* Set navigation to FIRST_MESSAGE so that the dequeue resets the position */ /* after a new consumer_name is set in the dequeue options */ OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)&navigation, 0, OCI_ATTR_NAVIGATION, errhp); /* Dequeue from the msg_queue_multiple as consumer BLUE */ OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)"BLUE", strlen("BLUE"), OCI_ATTR_CONSUMER_NAME, errhp); while (OCIAQDeq(svchp, errhp, (CONST text *)"msg_queue_multiple", deqopt, 0, mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, 0, 0) == OCI_SUCCESS) { printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject)); printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data)); } OCITransCommit(svchp, errhp, (ub4) 0); /* Dequeue from the msg_queue_multiple as consumer RED */ OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)"RED", strlen("RED"), OCI_ATTR_CONSUMER_NAME, errhp); while (OCIAQDeq(svchp, errhp, (CONST text *)"msg_queue_multiple", deqopt, 0, mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, 0, 0) == OCI_SUCCESS) { printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject)); printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data)); } OCITransCommit(svchp, errhp, (ub4) 0); /* Dequeue from the msg_queue_multiple as consumer GREEN */ OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS,(dvoid *)"GREEN",strlen("GREEN"), OCI_ATTR_CONSUMER_NAME, errhp); while (OCIAQDeq(svchp, errhp, (CONST text *)"msg_queue_multiple", deqopt, 0, mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, 0, 0) == OCI_SUCCESS) { printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject)); printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data)); } OCITransCommit(svchp, errhp, (ub4) 0); }
Example 2-30 Enqueuing and Dequeuing Messages Using Message Grouping Using PL/SQL
CONNECT aq/aq EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE ( queue_table => 'aq.msggroup', queue_payload_type => 'aq.message_typ', message_grouping => DBMS_AQADM.TRANSACTIONAL); EXECUTE DBMS_AQADM.CREATE_QUEUE( queue_name => 'msggroup_queue', queue_table => 'aq.msggroup'); EXECUTE DBMS_AQADM.START_QUEUE( queue_name => 'msggroup_queue'); /* Enqueue three messages in each transaction */ DECLARE enqueue_options DBMS_AQ.enqueue_options_t; message_properties DBMS_AQ.message_properties_t; message_handle RAW(16); message aq.message_typ; BEGIN /* Loop through three times, committing after every iteration */ FOR txnno in 1..3 LOOP /* Loop through three times, enqueuing each iteration */ FOR mesgno in 1..3 LOOP message := message_typ('GROUP#' || txnno, 'Message#' || mesgno || ' in group' || txnno); DBMS_AQ.ENQUEUE(queue_name => 'msggroup_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); END LOOP; /* Commit the transaction */ COMMIT; END LOOP; END; /* Now dequeue the messages as groups */ DECLARE dequeue_options DBMS_AQ.dequeue_options_t; message_properties DBMS_AQ.message_properties_t; message_handle RAW(16); message aq.message_typ; no_messages exception; end_of_group exception; PRAGMA EXCEPTION_INIT (no_messages, -25228); PRAGMA EXCEPTION_INIT (end_of_group, -25235); BEGIN dequeue_options.wait := DBMS_AQ.NO_WAIT; dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE; LOOP BEGIN DBMS_AQ.DEQUEUE(queue_name => 'msggroup_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); DBMS_OUTPUT.PUT_LINE ('Message: ' || message.subject || ' ... ' || message.text ); dequeue_options.navigation := DBMS_AQ.NEXT_MESSAGE; EXCEPTION WHEN end_of_group THEN DBMS_OUTPUT.PUT_LINE ('Finished processing a group of messages'); COMMIT; dequeue_options.navigation := DBMS_AQ.NEXT_TRANSACTION; END; END LOOP; EXCEPTION WHEN no_messages THEN DBMS_OUTPUT.PUT_LINE ('No more messages'); END;
Example 2-31 Enqueuing and Dequeuing Object Type Messages That Contain LOB Attributes Using PL/SQL
/* Create the message payload object type with one or more LOB attributes. On enqueue, set the LOB attribute to EMPTY_BLOB. After the enqueue completes, before you commit your transaction. Select the LOB attribute from the user_data column of the queue table or queue table view. You can now use the LOB interfaces (which are available through both OCI and PL/SQL) to write the LOB data to the queue. On dequeue, the message payload will contain the LOB locator. You can use this LOB locator after the dequeue, but before you commit your transaction, to read the LOB data. */ /* Setup the accounts: */ connect system/manager CREATE USER aqadm IDENTIFIED BY aqadm; GRANT CONNECT, RESOURCE TO aqadm; GRANT aq_administrator_role TO aqadm; CREATE USER aq IDENTIFIED BY aq; GRANT CONNECT, RESOURCE TO aq; GRANT EXECUTE ON DBMS_AQ TO aq; CREATE TYPE aq.message AS OBJECT(id NUMBER, subject VARCHAR2(100), data BLOB, trailer NUMBER); CREATE TABLESPACE aq_tbs DATAFILE 'aq.dbs' SIZE 2M REUSE; /* create the queue table, queues and start the queue: */ CONNECT aqadm/aqadm EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE( queue_table => 'aq.qt1', queue_payload_type => 'aq.message'); EXECUTE DBMS_AQADM.CREATE_QUEUE( queue_name => 'aq.queue1', queue_table => 'aq.qt1'); EXECUTE DBMS_AQADM.START_QUEUE(queue_name => 'aq.queue1'); /* End set up: */ /* Enqueue Large data types: */ CONNECT aq/aq CREATE OR REPLACE PROCEDURE blobenqueue(msgno IN NUMBER) AS enq_userdata aq.message; enq_msgid RAW(16); enqopt DBMS_AQ.enqueue_options_t; msgprop DBMS_AQ.message_properties_t; lob_loc BLOB; buffer RAW(4096); BEGIN buffer := HEXTORAW(RPAD('FF', 4096, 'FF')); enq_userdata := aq.message(msgno, 'Large Lob data', EMPTY_BLOB(), msgno); DBMS_AQ.ENQUEUE('aq.queue1', enqopt, msgprop, enq_userdata, enq_msgid); --select the lob locator for the queue table SELECT t.user_data.data INTO lob_loc FROM qt1 t WHERE t.msgid = enq_msgid; DBMS_LOB.WRITE(lob_loc, 2000, 1, buffer ); COMMIT; END; /* Dequeue lob data: */ CREATE OR REPLACE PROCEDURE blobdequeue AS dequeue_options DBMS_AQ.dequeue_options_t; message_properties DBMS_AQ.message_properties_t; mid RAW(16); pload aq.message; lob_loc BLOB; amount BINARY_INTEGER; buffer RAW(4096); BEGIN DBMS_AQ.DEQUEUE('aq.queue1', dequeue_options, message_properties, pload, mid); lob_loc := pload.data; -- read the lob data info buffer amount := 2000; DBMS_LOB.READ(lob_loc, amount, 1, buffer); DBMS_OUTPUT.PUT_LINE('Amount of data read: '||amount); COMMIT; END; /* Do the enqueues and dequeues: */ SET SERVEROUTPUT ON BEGIN FOR i IN 1..5 LOOP blobenqueue(i); END LOOP; END; BEGIN FOR i IN 1..5 LOOP blobdequeue(); END LOOP; END;
Example 2-32 Enqueuing and Dequeuing Object Type Messages That Contain LOB Attributes Using Java
1. Create the message type (ADT with CLOB and BLOB).
connect aquser/aquser create type LobMessage as object(id NUMBER, subject varchar2(100), data blob, cdata clob, trailer number);
2. Create the queue table and queue.
connect aquser/aquser EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE( queue_table => 'qt_adt', queue_payload_type => 'LOBMESSAGE', comment => 'single-consumer, default sort ordering, ADT Message', compatible => '8.1.0' ); EXECUTE DBMS_AQADM.CREATE_QUEUE( queue_name => 'q1_adt', queue_table => 'qt_adt' ); EXECUTE DBMS_AQADM.START_QUEUE(queue_name => 'q1_adt');
3. Run jpublisher to generate the java class that maps to the LobMessage.
Oracle object type jpub -user=aquser/aquser -sql=LobMessage -case=mixed -methods=false -usertypes=oracle -compatible=CustomDatum
4. Enqueue and dequeue messages.
The following examples illustrate Oracle Streams AQ propagation:
Managing Propagation From One Queue To Other Queues In the Same Database Using PL/SQL
Managing Propagation from One Queue to Other Queues In Another Database Using PL/SQL
Unscheduling Propagation Using PL/SQL
Caution: You must create queues or queue tables, or start or enable queues, for certain examples to work. |
Example 2-33 Enqueuing Messages for Remote Subscribers or Recipients to a Multiconsumer Queue and Propagation Scheduling Using PL/SQL
/* Create subscriber list: */ DECLARE subscriber aq$_agent; /* Add subscribers RED and GREEN with different addresses to the suscriber list: */ BEGIN BEGIN /* Add subscriber RED that will dequeue messages from another_msg_queue queue in the same datatbase */ subscriber := aq$_agent('RED', 'another_msg_queue', NULL); DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'msg_queue_multiple', subscriber => subscriber); /* Schedule propagation from msg_queue_multiple to other queues in the same database: */ DBMS_AQADM.SCHEDULE_PROPAGATION(queue_name => 'msg_queue_multiple'); /* Add subscriber GREEN that will dequeue messages from the msg_queue queue in another database reached by the database link another_db.world */ subscriber := aq$_agent('GREEN', 'msg_queue@another_db.world', NULL); DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'msg_queue_multiple', subscriber => subscriber); /* Schedule propagation from msg_queue_multiple to other queues in the database "another_database": */ END; BEGIN DBMS_AQADM.SCHEDULE_PROPAGATION(queue_name => 'msg_queue_multiple', destination => 'another_db.world'); END; END; DECLARE enqueue_options DBMS_AQ.enqueue_options_t; message_properties DBMS_AQ.message_properties_t; recipients DBMS_AQ.aq$_recipient_list_t; message_handle RAW(16); message aq.message_typ; /* Enqueue MESSAGE 1 for subscribers to the queue. */ BEGIN message := message_typ('MESSAGE 1', 'This message is queued for queue subscribers.'); DBMS_AQ.ENQUEUE(queue_name => 'msg_queue_multiple', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); /* Enqueue MESSAGE 2 for specified recipients.*/ message := message_typ('MESSAGE 2', 'This message is queued for two recipients.'); recipients(1) := aq$_agent('RED', 'another_msg_queue', NULL); recipients(2) := aq$_agent('BLUE', NULL, NULL); message_properties.recipient_list := recipients; DBMS_AQ.ENQUEUE(queue_name => 'msg_queue_multiple', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); COMMIT; END;
Note: RED at address another_msg_queue is both a subscriber to the queue, as well as being a specified recipient of MESSAGE 2 . By contrast, GREEN at address msg_queue@another_db.world is only a subscriber to those messages in the queue (in this case, MESSAGE 1 ) for which no recipients have been specified. BLUE , while not a subscriber to the queue, is nevertheless specified to receive MESSAGE 2 . |
Example 2-34 Managing Propagation From One Queue To Other Queues In the Same Database Using PL/SQL
/* Schedule propagation from queue q1def to other queues in the same database */ EXECUTE DBMS_AQADM.SCHEDULE_PROPAGATION(queue_name => 'q1def'); /* Disable propagation from queue q1def to other queues in the same database */ EXECUTE DBMS_AQADM.DISABLE_PROPAGATION_SCHEDULE( queue_name => 'q1def'); /* Alter schedule from queue q1def to other queues in the same database */ EXECUTE DBMS_AQADM.ALTER_PROPAGATION_SCHEDULE( queue_name => 'q1def', duration => '2000', next_time => 'SYSDATE + 3600/86400', latency => '32'); /* Enable propagation from queue q1def to other queues in the same database */ EXECUTE DBMS_AQADM.ENABLE_PROPAGATION_SCHEDULE( queue_name => 'q1def'); /* Unschedule propagation from queue q1def to other queues in the same database */ EXECUTE DBMS_AQADM.UNSCHEDULE_PROPAGATION( queue_name => 'q1def');
Example 2-35 Managing Propagation from One Queue to Other Queues In Another Database Using PL/SQL
/* Schedule propagation from queue q1def to other queues in another database reached by the database link another_db.world */ EXECUTE DBMS_AQADM.SCHEDULE_PROPAGATION( queue_name => 'q1def', destination => 'another_db.world'); /* Disable propagation from queue q1def to other queues in another database reached by the database link another_db.world */ EXECUTE DBMS_AQADM.DISABLE_PROPAGATION_SCHEDULE( queue_name => 'q1def', destination => 'another_db.world'); /* Alter schedule from queue q1def to other queues in another database reached by the database link another_db.world */ EXECUTE DBMS_AQADM.ALTER_PROPAGATION_SCHEDULE( queue_name => 'q1def', destination => 'another_db.world', duration => '2000', next_time => 'SYSDATE + 3600/86400', latency => '32'); /* Enable propagation from queue q1def to other queues in another database reached by the database link another_db.world */ EXECUTE DBMS_AQADM.ENABLE_PROPAGATION_SCHEDULE( queue_name => 'q1def', destination => 'another_db.world'); /* Unschedule propagation from queue q1def to other queues in another database reached by the database link another_db.world */ EXECUTE DBMS_AQADM.UNSCHEDULE_PROPAGATION( queue_name => 'q1def', destination => 'another_db.world');
The following example illustrates how to drop Oracle Streams AQ objects.
Caution: You must create queues or queue tables, or start, stop, or enable queues, for certain examples to work. |
Example 2-37 Dropping Oracle Streams AQ Objects
/* Cleans up all objects related to the object type: */ CONNECT aq/aq EXECUTE DBMS_AQADM.STOP_QUEUE ( queue_name => 'msg_queue'); EXECUTE DBMS_AQADM.DROP_QUEUE ( queue_name => 'msg_queue'); EXECUTE DBMS_AQADM.DROP_QUEUE_TABLE ( queue_table => 'aq.objmsgs80_qtab'); /* Cleans up all objects related to the RAW type: */ EXECUTE DBMS_AQADM.STOP_QUEUE ( queue_name => 'raw_msg_queue'); EXECUTE DBMS_AQADM.DROP_QUEUE ( queue_name => 'raw_msg_queue'); EXECUTE DBMS_AQADM.DROP_QUEUE_TABLE ( queue_table => 'aq.RawMsgs_qtab'); /* Cleans up all objects related to the priority queue: */ EXECUTE DBMS_AQADM.STOP_QUEUE ( queue_name => 'priority_msg_queue'); EXECUTE DBMS_AQADM.DROP_QUEUE ( queue_name => 'priority_msg_queue'); EXECUTE DBMS_AQADM.DROP_QUEUE_TABLE ( queue_table => 'aq.priority_msg'); /* Cleans up all objects related to the multiple-consumer queue: */ EXECUTE DBMS_AQADM.STOP_QUEUE ( queue_name => 'msg_queue_multiple'); EXECUTE DBMS_AQADM.DROP_QUEUE ( queue_name => 'msg_queue_multiple'); EXECUTE DBMS_AQADM.DROP_QUEUE_TABLE ( queue_table => 'aq.MultiConsumerMsgs_qtab'); DROP TYPE aq.message_typ;
The following example illustrates how to revoke roles and privileges in Oracle Streams AQ.
You must set up the following data structures for certain examples to work:
CONNECT system/manager; DROP USER aqadm CASCADE; GRANT CONNECT, RESOURCE TO aqadm; CREATE USER aqadm IDENTIFIED BY aqadm; GRANT EXECUTE ON DBMS_AQADM TO aqadm; GRANT Aq_administrator_role TO aqadm; DROP USER aq CASCADE; CREATE USER aq IDENTIFIED BY aq; GRANT CONNECT, RESOURCE TO aq; GRANT EXECUTE ON dbms_aq TO aq; EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE( queue_table => 'aq.qtable', queue_payload_type => 'RAW'); EXECUTE DBMS_AQADM.CREATE_QUEUE( queue_name => 'aq.aqsqueue', queue_table => 'aq.qtable'); EXECUTE DBMS_AQADM.START_QUEUE(queue_name => 'aq.aqsqueue');
The following example illustrates how to deploy Oracle Streams AQ with XA.
Example 2-39 Deploying Oracle Streams AQ with XA
/* * The program uses the XA interface to enqueue 100 messages and then * dequeue them. * Login: aq/aq * Requires: AQ_USER_ROLE to be granted to aq * a RAW queue called "aqsqueue" to be created in aqs schema * (preceding steps can be performed by running aqaq.sql) * Message Format: Msgno: [0-1000] HELLO, WORLD! * Author: schandra@us.oracle.com */ #ifndef OCI_ORACLE #include <oci.h> #endif #include <xa.h> /* XA open string */ char xaoinfo[] = "oracle_xa+ACC=P/AQ/AQ+SESTM=30+Objects=T"; /* template for generating XA XIDs */ XID xidtempl = { 0x1e0a0a1e, 12, 8, "GTRID001BQual001" }; /* Pointer to Oracle XA function table */ extern struct xa_switch_t xaosw; /* Oracle XA switch */ static struct xa_switch_t *xafunc = &xaosw; /* dummy stubs for ax_reg and ax_unreg */ int ax_reg(rmid, xid, flags) int rmid; XID *xid; long flags; { xid->formatID = -1; return 0; } int ax_unreg(rmid, flags) int rmid; long flags; { return 0; } /* generate an XID */ void xidgen(xid, serialno) XID *xid; int serialno; { char seq [11]; sprintf(seq, "%d", serialno); memcpy((void *)xid, (void *)&xidtempl, sizeof(XID)); strncpy((&xid->data[5]), seq, 3); } /* check if XA operation succeeded */ #define checkXAerr(action, funcname) \ if ((action) != XA_OK) \ { \ printf("%s failed!\n", funcname); \ exit(-1); \ } else /* check if OCI operation succeeded */ static void checkOCIerr(errhp, status) OCIError *errhp; sword status; { text errbuf[512]; ub4 buflen; sb4 errcode; if (status == OCI_SUCCESS) return; if (status == OCI_ERROR) { OCIErrorGet((dvoid *) errhp, 1, (text *)0, &errcode, errbuf, (ub4)sizeof(errbuf), OCI_HTYPE_ERROR); printf("Error - %s\n", errbuf); } else printf("Error - %d\n", status); exit (-1); } void main(argc, argv) int argc; char **argv; { int msgno = 0; /* message being enqueued */ OCIEnv *envhp; /* OCI environment handle */ OCIError *errhp; /* OCI Error handle */ OCISvcCtx *svchp; /* OCI Service handle */ char message[128]; /* message buffer */ ub4 mesglen; /* length of message */ OCIRaw *rawmesg = (OCIRaw *)0; /* message in OCI RAW format */ OCIInd ind = 0; /* OCI null indicator */ dvoid *indptr = (dvoid *)&ind; /* null indicator pointer */ OCIType *mesg_tdo = (OCIType *) 0; /* TDO for RAW datatype */ XID xid; /* XA's global transaction id */ ub4 i; /* array index */ checkXAerr(xafunc->xa_open_entry(xaoinfo, 1, TMNOFLAGS), "xaoopen"); svchp = xaoSvcCtx((text *)0); /* get service handle from XA */ envhp = xaoEnv((text *)0); /* get enviornment handle from XA */ if (!svchp || !envhp) { printf("Unable to obtain OCI Handles from XA!\n"); exit (-1); } OCIHandleAlloc((dvoid *)envhp, (dvoid **)&errhp, OCI_HTYPE_ERROR, 0, (dvoid **)0); /* allocate error handle */ /* enqueue 1000 messages, 1 message for each XA transaction */ for (msgno = 0; msgno < 1000; msgno++) { sprintf((const char *)message, "Msgno: %d, Hello, World!", msgno); mesglen = (ub4)strlen((const char *)message); xidgen(&xid, msgno); /* generate an XA xid */ checkXAerr(xafunc->xa_start_entry(&xid, 1, TMNOFLAGS), "xaostart"); checkOCIerr(errhp, OCIRawAssignBytes(envhp, errhp, (ub1 *)message, mesglen, &rawmesg)); if (!mesg_tdo) /* get Type descriptor (TDO) for RAW type */ checkOCIerr(errhp, OCITypeByName(envhp, errhp, svchp, (CONST text *)"AQADM", strlen("AQADM"), (CONST text *)"RAW", strlen("RAW"), (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo)); checkOCIerr(errhp, OCIAQEnq(svchp, errhp, (CONST text *)"aqsqueue", 0, 0, mesg_tdo, (dvoid **)&rawmesg, &indptr, 0, 0)); checkXAerr(xafunc->xa_end_entry(&xid, 1, TMSUCCESS), "xaoend"); checkXAerr(xafunc->xa_commit_entry(&xid, 1, TMONEPHASE), "xaocommit"); printf("%s Enqueued\n", message); } /* dequeue 1000 messages within one XA transaction */ xidgen(&xid, msgno); /* generate an XA xid */ checkXAerr(xafunc->xa_start_entry(&xid, 1, TMNOFLAGS), "xaostart"); for (msgno = 0; msgno < 1000; msgno++) { checkOCIerr(errhp, OCIAQDeq(svchp, errhp, (CONST text *)"aqsqueue", 0, 0, mesg_tdo, (dvoid **)&rawmesg, &indptr, 0, 0)); if (ind) printf("Null Raw Message"); else for (i = 0; i < OCIRawSize(envhp, rawmesg); i++) printf("%c", *(OCIRawPtr(envhp, rawmesg) + i)); printf("\n"); } checkXAerr(xafunc->xa_end_entry(&xid, 1, TMSUCCESS), "xaoend"); checkXAerr(xafunc->xa_commit_entry(&xid, 1, TMONEPHASE), "xaocommit"); }
You must set up the following data structures for certain examples to work:
/* Create_types.sql */ CONNECT system/manager GRANT AQ_ADMINISTRATOR_ROLE, AQ_USER_ROLE TO scott; CONNECT scott/tiger CREATE TYPE MESSAGE AS OBJECT (id NUMBER, data VARCHAR2(80)); EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE( queue_table => 'qt', queue_payload_type => 'message'); EXECUTE DBMS_AQADM.CREATE_QUEUE('msgqueue', 'qt'); EXECUTE DBMS_AQADM.START_QUEUE('msgqueue');
The following examples illustrate Oracle Streams AQ memory usage:
Example 2-40 Enqueuing Messages (Free Memory After Every Call) Using OCI
This program, enqnoreuse.c
, dequeues each line of text from a queue 'msgqueue
' that has been created in the scott
schema using create_types.sql
. Messages are enqueued using enqnoreuse.c
or enqreuse.c
(see the following). If there are no messages, then it waits for 60 seconds before timing out. In this program, the dequeue subroutine does not reuse client side objects' memory. It allocates the required memory before dequeue and frees it after the dequeue is complete.
#ifndef OCI_ORACLE #include <oci.h> #endif #include <stdio.h> static void checkerr(OCIError *errhp, sword status); static void deqmesg(text *buf, ub4 *buflen); OCIEnv *envhp; OCIError *errhp; OCISvcCtx *svchp; struct message { OCINumber id; OCIString *data; }; typedef struct message message; struct null_message { OCIInd null_adt; OCIInd null_id; OCIInd null_data; }; typedef struct null_message null_message; static void deqmesg(buf, buflen) text *buf; ub4 *buflen; { OCIType *mesgtdo = (OCIType *)0; /* type descr of SCOTT.MESSAGE */ message *mesg = (dvoid *)0; /* instance of SCOTT.MESSAGE */ null_message *mesgind = (dvoid *)0; /* null indicator */ OCIAQDeqOptions *deqopt = (OCIAQDeqOptions *)0; ub4 wait = 60; /* timeout after 60 seconds */ ub4 navigation = OCI_DEQ_FIRST_MSG;/* always get head of q */ /* Get the type descriptor object for the type SCOTT.MESSAGE: */ checkerr(errhp, OCITypeByName(envhp, errhp, svchp, (CONST text *)"SCOTT", strlen("SCOTT"), (CONST text *)"MESSAGE", strlen("MESSAGE"), (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesgtdo)); /* Allocate an instance of SCOTT.MESSAGE, and get its null indicator: */ checkerr(errhp, OCIObjectNew(envhp, errhp, svchp, OCI_TYPECODE_OBJECT, mesgtdo, (dvoid *)0, OCI_DURATION_SESSION, TRUE, (dvoid **)&mesg)); checkerr(errhp, OCIObjectGetInd(envhp, errhp, (dvoid *)mesg, (dvoid **)&mesgind)); /* Allocate a descriptor for dequeue options and set wait time, navigation: */ checkerr(errhp, OCIDescriptorAlloc(envhp, (dvoid **)&deqopt, OCI_DTYPE_AQDEQ_OPTIONS, 0, (dvoid **)0)); checkerr(errhp, OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)&wait, 0, OCI_ATTR_WAIT, errhp)); checkerr(errhp, OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)&navigation, 0, OCI_ATTR_NAVIGATION, errhp)); /* Dequeue the message and commit: */ checkerr(errhp, OCIAQDeq(svchp, errhp, (CONST text *)"msgqueue", deqopt, 0, mesgtdo, (dvoid **)&mesg, (dvoid **)&mesgind, 0, 0)); checkerr(errhp, OCITransCommit(svchp, errhp, (ub4) 0)); /* Copy the message payload text into the user buffer: */ if (mesgind->null_data) *buflen = 0; else memcpy((dvoid *)buf, (dvoid *)OCIStringPtr(envhp, mesg->data), (size_t)(*buflen = OCIStringSize(envhp, mesg->data))); /* Free the dequeue options descriptor: */ checkerr(errhp, OCIDescriptorFree((dvoid *)deqopt, OCI_DTYPE_AQDEQ_OPTIONS)); /* Free the memory for the objects: */ Checkerr(errhp, OCIObjectFree(envhp, errhp, (dvoid *)mesg, OCI_OBJECTFREE_FORCE)); } /* end deqmesg */ void main() { OCIServer *srvhp; OCISession *usrhp; dvoid *tmp; text buf[80]; /* payload text */ ub4 buflen; OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0, (dvoid * (*)()) 0, (void (*)()) 0 ); OCIHandleAlloc((dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 52, (dvoid **) &tmp); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp ); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 52, (dvoid **) &tmp); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 52, (dvoid **) &tmp); OCIServerAttach(srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 52, (dvoid **) &tmp); /* Set attribute server context in the service context: */ OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); /* Allocate a user context handle: */ OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION, (dvoid *)"scott", (ub4)strlen("scott"), OCI_ATTR_USERNAME, errhp); OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION, (dvoid *)"tiger", (ub4)strlen("tiger"), OCI_ATTR_PASSWORD, errhp); checkerr(errhp, OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, OCI_DEFAULT)); OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX, (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp); do { deqmesg(buf, &buflen); printf("%.*s\n", buflen, buf); } while(1); } /* end main */ static void checkerr(errhp, status) OCIError *errhp; sword status; { text errbuf[512]; ub4 buflen; sb4 errcode; if (status == OCI_SUCCESS) return; switch (status) { case OCI_ERROR: OCIErrorGet ((dvoid *) errhp, (ub4) 1, (text *) NULL, &errcode, errbuf, (ub4) sizeof(errbuf), (ub4) OCI_HTYPE_ERROR); printf("Error - %s\n", errbuf); break; case OCI_INVALID_HANDLE: printf("Error - OCI_INVALID_HANDLE\n"); break; default: printf("Error - %d\n", status); break; } exit(-1); } /* end checkerr */
Example 2-41 Enqueuing Messages (Reuse Memory) Using OCI
This program, enqreuse.c
, enqueues each line of text into a queue 'msgqueue' that has been created in the scott
schema by executing create_types.sql
. Each line of text entered by the user is stored in the queue until user enters EOF
. In this program the enqueue subroutine reuses the memory for the message payload, as well as the Oracle Streams AQ message properties descriptor.
#ifndef OCI_ORACLE #include <oci.h> #endif #include <stdio.h> static void checkerr(OCIError *errhp, sword status); static void enqmesg(ub4 msgno, text *buf); struct message { OCINumber id; OCIString *data; }; typedef struct message message; struct null_message { OCIInd null_adt; OCIInd null_id; OCIInd null_data; }; typedef struct null_message null_message; /* Global data reused on calls to enqueue: */ OCIEnv *envhp; OCIError *errhp; OCISvcCtx *svchp; message msg; null_message nmsg; OCIAQMsgProperties *msgprop; static void enqmesg(msgno, buf) ub4 msgno; text *buf; { OCIType *mesgtdo = (OCIType *)0; /* type descr of SCOTT.MESSAGE */ message *mesg = &msg; /* instance of SCOTT.MESSAGE */ null_message *mesgind = &nmsg; /* null indicator */ text corrid[128]; /* correlation identifier */ /* Get the type descriptor object for the type SCOTT.MESSAGE: */ checkerr(errhp, OCITypeByName(envhp, errhp, svchp, (CONST text *)"SCOTT", strlen("SCOTT"), (CONST text *)"MESSAGE", strlen("MESSAGE"), (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesgtdo)); /* Fill in the attributes of SCOTT.MESSAGE: */ checkerr(errhp, OCINumberFromInt(errhp, &msgno, sizeof(ub4), 0, &mesg->id)); checkerr(errhp, OCIStringAssignText(envhp, errhp, buf, strlen(buf), &mesg->data)); mesgind->null_adt = mesgind->null_id = mesgind->null_data = 0; /* Set the correlation id in the message properties descriptor: */ sprintf((char *)corrid, "Msg#: %d", msgno); checkerr(errhp, OCIAttrSet(msgprop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *)&corrid, strlen(corrid), OCI_ATTR_CORRELATION, errhp)); /* Enqueue the message and commit: */ checkerr(errhp, OCIAQEnq(svchp, errhp, (CONST text *)"msgqueue", 0, msgprop, mesgtdo, (dvoid **)&mesg, (dvoid **)&mesgind, 0, 0)); checkerr(errhp, OCITransCommit(svchp, errhp, (ub4) 0)); } /* end enqmesg */ void main() { OCIServer *srvhp; OCISession *usrhp; dvoid *tmp; text buf[80]; /* user supplied text */ int msgno = 0; OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0, (dvoid * (*)()) 0, (void (*)()) 0 ); OCIHandleAlloc((dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 52, (dvoid **) &tmp); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp ); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 52, (dvoid **) &tmp); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 52, (dvoid **) &tmp); OCIServerAttach(srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 52, (dvoid **) &tmp); /* Set attribute server context in the service context: */ OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); /* Allocate a user context handle: */ OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION, (dvoid *)"scott", (ub4)strlen("scott"), OCI_ATTR_USERNAME, errhp); OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION, (dvoid *)"tiger", (ub4)strlen("tiger"), OCI_ATTR_PASSWORD, errhp); checkerr(errhp, OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, OCI_DEFAULT)); OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX, (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp); /* Allocate a message properties descriptor to fill in correlation ID :*/ checkerr(errhp, OCIDescriptorAlloc(envhp, (dvoid **)&msgprop, OCI_DTYPE_AQMSG_PROPERTIES, 0, (dvoid **)0)); do { printf("Enter a line of text (max 80 chars):"); if (!gets((char *)buf)) break; enqmesg((ub4)msgno++, buf); } while(1); /* Free the message properties descriptor: */ checkerr(errhp, OCIDescriptorFree((dvoid *)msgprop, OCI_DTYPE_AQMSG_PROPERTIES)); } /* end main */ static void checkerr(errhp, status) OCIError *errhp; sword status; { text errbuf[512]; ub4 buflen; sb4 errcode; if (status == OCI_SUCCESS) return; switch (status) { case OCI_ERROR: OCIErrorGet ((dvoid *) errhp, (ub4) 1, (text *) NULL, &errcode, errbuf, (ub4) sizeof(errbuf), (ub4) OCI_HTYPE_ERROR); printf("Error - %s\n", errbuf); break; case OCI_INVALID_HANDLE: printf("Error - OCI_INVALID_HANDLE\n"); break; default: printf("Error - %d\n", status); break; } exit(-1); } /* end checkerr */
Example 2-42 Dequeuing Messages (Free Memory After Every Call) Using OCI
This program, deqnoreuse.c
, dequeues each line of text from a queue 'msgqueue
' that has been created in the scott
schema by executing create_types.sql
. Messages are enqueued using enqnoreuse
or enqreuse
. If there are no messages, then it waits for 60 seconds before timing out. In this program the dequeue subroutine does not reuse client side objects' memory. It allocates the required memory before dequeue and frees it after the dequeue is complete.
#ifndef OCI_ORACLE #include <oci.h> #endif #include <stdio.h> static void checkerr(OCIError *errhp, sword status); static void deqmesg(text *buf, ub4 *buflen); OCIEnv *envhp; OCIError *errhp; OCISvcCtx *svchp; struct message { OCINumber id; OCIString *data; }; typedef struct message message; struct null_message { OCIInd null_adt; OCIInd null_id; OCIInd null_data; }; typedef struct null_message null_message; static void deqmesg(buf, buflen) text *buf; ub4 *buflen; { OCIType *mesgtdo = (OCIType *)0; /* type descr of SCOTT.MESSAGE */ message *mesg = (dvoid *)0; /* instance of SCOTT.MESSAGE */ null_message *mesgind = (dvoid *)0; /* null indicator */ OCIAQDeqOptions *deqopt = (OCIAQDeqOptions *)0; ub4 wait = 60; /* timeout after 60 seconds */ ub4 navigation = OCI_DEQ_FIRST_MSG;/* always get head of q */ /* Get the type descriptor object for the type SCOTT.MESSAGE: */ checkerr(errhp, OCITypeByName(envhp, errhp, svchp, (CONST text *)"SCOTT", strlen("SCOTT"), (CONST text *)"MESSAGE", strlen("MESSAGE"), (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesgtdo)); /* Allocate an instance of SCOTT.MESSAGE, and get its null indicator: */ checkerr(errhp, OCIObjectNew(envhp, errhp, svchp, OCI_TYPECODE_OBJECT, mesgtdo, (dvoid *)0, OCI_DURATION_SESSION, TRUE, (dvoid **)&mesg)); checkerr(errhp, OCIObjectGetInd(envhp, errhp, (dvoid *)mesg, (dvoid **)&mesgind)); /* Allocate a descriptor for dequeue options and set wait time, navigation: */ checkerr(errhp, OCIDescriptorAlloc(envhp, (dvoid **)&deqopt, OCI_DTYPE_AQDEQ_OPTIONS, 0, (dvoid **)0)); checkerr(errhp, OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)&wait, 0, OCI_ATTR_WAIT, errhp)); checkerr(errhp, OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)&navigation, 0, OCI_ATTR_NAVIGATION, errhp)); /* Dequeue the message and commit: */ checkerr(errhp, OCIAQDeq(svchp, errhp, (CONST text *)"msgqueue", deqopt, 0, mesgtdo, (dvoid **)&mesg, (dvoid **)&mesgind, 0, 0)); checkerr(errhp, OCITransCommit(svchp, errhp, (ub4) 0)); /* Copy the message payload text into the user buffer: */ if (mesgind->null_data) *buflen = 0; else memcpy((dvoid *)buf, (dvoid *)OCIStringPtr(envhp, mesg->data), (size_t)(*buflen = OCIStringSize(envhp, mesg->data))); /* Free the dequeue options descriptor: */ checkerr(errhp, OCIDescriptorFree((dvoid *)deqopt, OCI_DTYPE_AQDEQ_OPTIONS)); /* Free the memory for the objects: */ checkerr(errhp, OCIObjectFree(envhp, errhp, (dvoid *)mesg, OCI_OBJECTFREE_FORCE)); } /* end deqmesg */ void main() { OCIServer *srvhp; OCISession *usrhp; dvoid *tmp; text buf[80]; /* payload text */ ub4 buflen; OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0, (dvoid * (*)()) 0, (void (*)()) 0 ); OCIHandleAlloc((dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 52, (dvoid **) &tmp); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp ); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 52, (dvoid **) &tmp); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 52, (dvoid **) &tmp); OCIServerAttach(srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 52, (dvoid **) &tmp); /* Set attribute server context in the service context: */ OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); /* Allocate a user context handle: */ OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION, (dvoid *)"scott", (ub4)strlen("scott"), OCI_ATTR_USERNAME, errhp); OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION, (dvoid *)"tiger", (ub4)strlen("tiger"), OCI_ATTR_PASSWORD, errhp); checkerr(errhp, OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, OCI_DEFAULT)); OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX, (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp); do { deqmesg(buf, &buflen); printf("%.*s\n", buflen, buf); } while(1); } /* end main */ static void checkerr(errhp, status) OCIError *errhp; sword status; { text errbuf[512]; ub4 buflen; sb4 errcode; if (status == OCI_SUCCESS) return; switch (status) { case OCI_ERROR: OCIErrorGet ((dvoid *) errhp, (ub4) 1, (text *) NULL, &errcode, errbuf, (ub4) sizeof(errbuf), (ub4) OCI_HTYPE_ERROR); printf("Error - %s\n", errbuf); break; case OCI_INVALID_HANDLE: printf("Error - OCI_INVALID_HANDLE\n"); break; default: printf("Error - %d\n", status); break; } exit(-1); } /* end checkerr */
Example 2-43 Dequeuing Messages (Reuse Memory) Using OCI
This program, deqreuse.c
, dequeues each line of text from a queue 'msgqueue
' that has been created in the scott
schema by executing create_types.sql
. Messages are enqueued using enqnoreuse.c
or enqreuse.c
. If there are no messages, then it waits for 60 seconds before timing out. In this program, the dequeue subroutine reuses client side objects' memory between invocation of OCIAQDeq
.
During the first call to OCIAQDeq
, OCI automatically allocates the memory for the message payload.
During subsequent calls to OCIAQDeq
, the same payload pointers are passed and OCI automatically resizes the payload memory if necessary.
#ifndef OCI_ORACLE
#include <oci.h> #endif #include <stdio.h> static void checkerr(OCIError *errhp, sword status); static void deqmesg(text *buf, ub4 *buflen); struct message { OCINumber id; OCIString *data; }; typedef struct message message; struct null_message { OCIInd null_adt; OCIInd null_id; OCIInd null_data; }; typedef struct null_message null_message; /* Global data reused on calls to enqueue: */ OCIEnv *envhp; OCIError *errhp; OCISvcCtx *svchp; OCIAQDeqOptions *deqopt; message *mesg = (message *)0; null_message *mesgind = (null_message *)0; static void deqmesg(buf, buflen) text *buf; ub4 *buflen; { OCIType *mesgtdo = (OCIType *)0; /* type descr of SCOTT.MESSAGE */ ub4 wait = 60; /* timeout after 60 seconds */ ub4 navigation = OCI_DEQ_FIRST_MSG;/* always get head of q */ /* Get the type descriptor object for the type SCOTT.MESSAGE: */ checkerr(errhp, OCITypeByName(envhp, errhp, svchp, (CONST text *)"SCOTT", strlen("SCOTT"), (CONST text *)"MESSAGE", strlen("MESSAGE"), (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesgtdo)); /* Set wait time, navigation in dequeue options: */ checkerr(errhp, OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)&wait, 0, OCI_ATTR_WAIT, errhp)); checkerr(errhp, OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)&navigation, 0, OCI_ATTR_NAVIGATION, errhp)); /* * Dequeue the message and commit. The memory for the payload is * automatically allocated/resized by OCI: */ checkerr(errhp, OCIAQDeq(svchp, errhp, (CONST text *)"msgqueue", deqopt, 0, mesgtdo, (dvoid **)&mesg, (dvoid **)&mesgind, 0, 0)); checkerr(errhp, OCITransCommit(svchp, errhp, (ub4) 0)); /* Copy the message payload text into the user buffer: */ if (mesgind->null_data) *buflen = 0; else memcpy((dvoid *)buf, (dvoid *)OCIStringPtr(envhp, mesg->data), (size_t)(*buflen = OCIStringSize(envhp, mesg->data))); } /* end deqmesg */ void main() { OCIServer *srvhp; OCISession *usrhp; dvoid *tmp; text buf[80]; /* payload text */ ub4 buflen; OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0, (dvoid * (*)()) 0, (void (*)()) 0 ); OCIHandleAlloc((dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 52, (dvoid **) &tmp); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp ); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 52, (dvoid **) &tmp); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 52, (dvoid **) &tmp); OCIServerAttach(srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 52, (dvoid **) &tmp); /* set attribute server context in the service context */ OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); /* allocate a user context handle */ OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION, (dvoid *)"scott", (ub4)strlen("scott"), OCI_ATTR_USERNAME, errhp); OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION, (dvoid *)"tiger", (ub4)strlen("tiger"), OCI_ATTR_PASSWORD, errhp); checkerr(errhp, OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, OCI_DEFAULT)); OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX, (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp); /* allocate the dequeue options descriptor */ checkerr(errhp, OCIDescriptorAlloc(envhp, (dvoid **)&deqopt, OCI_DTYPE_AQDEQ_OPTIONS, 0, (dvoid **)0)); do { deqmesg(buf, &buflen); printf("%.*s\n", buflen, buf); } while(1); /* * This program never reaches this point as the dequeue times out & exits. * If it does reach here, it is a good place to free the dequeue * options descriptor using OCIDescriptorFree and free the memory allocated * by OCI for the payload using OCIObjectFree */ } /* end main */ static void checkerr(errhp, status) OCIError *errhp; sword status; { text errbuf[512]; ub4 buflen; sb4 errcode; if (status == OCI_SUCCESS) return; switch (status) { case OCI_ERROR: OCIErrorGet ((dvoid *) errhp, (ub4) 1, (text *) NULL, &errcode, errbuf, (ub4) sizeof(errbuf), (ub4) OCI_HTYPE_ERROR); printf("Error - %s\n", errbuf); break; case OCI_INVALID_HANDLE: printf("Error - OCI_INVALID_HANDLE\n"); break; default: printf("Error - %d\n", status); break; } exit(-1); } /* end checkerr */
The following lists Oracle Streams AQ installation and general questions:
See Chapter 17, " Internet Access to Oracle Streams AQ" for a full discussion. The following summarizes the steps required to set up Internet access for Oracle Streams AQ queues:
Set up the Oracle Streams AQ servlet: If you are using a servlet execution engine that supports the Java Servlet 2.2 specification (such as Tomcat), then you must create a servlet that extends the oracle.AQ.xml.AQxmlServlet
class. If you are using a servlet execution engine that supports the Java Servlet 2.0 specification (like Apache Jserv), then you must create a servlet that extends the oracle.AQ.xml.AQxmlServlet20
class. Implement the init()
method in the servlet to specify database connection parameters.
Set up user authentication: Configure the Web server to authenticate all the users that send POST
requests to the Oracle Streams AQ servlet. Only authenticated users are allowed to access the Oracle Streams AQ servlet.
Set up user authorization: Register the Oracle Streams AQ agent name that is used to perform Oracle Streams AQ operations using DBMS_AQADM.CREATE_AQ_AGENT.
Map the agent to the database users using DBMS_AQADM.ENABLE_DB_ACCESS.
Now clients can write Simple Object Access Protocol (SOAP) requests and send them to the Oracle Streams AQ servlet using HTTP POST.
Here are the steps for setting up your database for e-mail notifications:
Set the SMTP mail host: Invoke DBMS_AQELM.SET_MAILHOST
as an Oracle Streams AQ administrator.
Set the SMTP mail port: Invoke DBMS_AQELM.SET_MAILPORT
as an Oracle Streams AQ administrator. If not explicit, set defaults to 25.
Set the SendFrom address: Invoke DBMS_AQELM.SET_SENDFROM
.
After setup, you can register for e-mail notifications using the Oracle Call Interface (OCI) or PL/SQL API.
See Chapter 17, " Internet Access to Oracle Streams AQ" for a full discussion. In summary, follow the steps for setting up Internet access for Oracle Streams AQ. The destination databases must be set up for Internet access, as follows:
At the source database, create the database link with protocol as http, and host and port of the Web server running the Oracle Streams AQ servlet with the username password for authentication with the Web server/servlet runner. For example, if the Web server is running on computer webdest.oracle.com
and listening for requests on port 8081, then the connect string of the database is:
(DESCRIPTION=(ADDRESS=(PROTOCOL=http)(HOST=webdest.oracle.com)(PORT=8081))
If SSL is used, then specify https as the protocol in the connect string. The database link is created as follows:
create public database link propdb connect to john IDENTIFIED BY welcome using '(DESCRIPTION=(ADDRESS=(PROTOCOL=http) (HOST=webdest.oracle.com) (PORT=8081))';
where user John
with password Welcome
is used to authenticate with the Web server, and is also known by the term Oracle Streams AQ HTTP agent.
Note: You cannot usenet_service_name in tnsnames.ora with the database link. Doing so results in error ORA-12538. |
If SSL is used, then create an Oracle wallet and specify the wallet path at the source database:
EXECUTE DBMS_AQADM.SET_AQ_PROPAGATIONWALLET('/home/myuid/cwallet.sso', 'welcome');
Deploy the Oracle Streams AQ servlet at the destination database: Create a class AQPropServlet
that extends oracle.AQ.xml.AQxmlServlet20
(if you are using a Servlet 2.0 execution engine like Apache Jserv) or extends oracle.AQ.xml.AQxmlServlet
(if you are using a Servlet 2.2 execution engine like Tomcat). This servlet must connect to the destination database. The servlet must be deployed on the Web server in the path aqserv/servlet
.
Note: In Oracle9i, the propagation servlet name and deployment path are fixed. That is, they must be AQPropServlet and the aqserv/servlet respectively. |
At the destination database: Set up the authorization and authentication for the Internet user performing propagation, in this case, John.
Start propagation at the source site by calling:
DBMS_AQADM.SCHEDULE_PROPAGATION('src_queue', 'propdb').
Access messages using SQL. Messages in the queue table (either because they are being retained or because they have not yet been processed). Each queue has a view that you can use (see "Number of Messages in Different States for the Whole Database View" ).
Typically we expect the subscriber to access the messages using the dequeue interface. If, however, you would like to see processed or waiting messages, then you can either dequeue by message ID or use SQL.
You cannot change the sort order for messages after you have created the queue table.
The exception queue for a multiconsumer queue must also be a multiconsumer queue.
Expired messages in multiconsumer queues cannot be dequeued by the intended recipients of the message. However, they can be dequeued in the REMOVE
mode once (and only once) using a NULL
consumer name in dequeue options. Messages can also be dequeued from an exception queue by specifying the message ID.
Expired messages can be dequeued only by specifying message ID if the multiconsumer exception queue was created in a queue table without the compatible parameter or with the compatible parameter set to '8.0'
If a latency less than 0 was specified in the propagation schedule, then the job is rescheduled to run after the specified latency. The time at which the job actually runs depends on other factors, such as the number of ready jobs and the number of job_queue_processes.
See Also: "Managing Job Queues" in Oracle Database Administrator's Guide for more information on job queues and Jnnn background processes |
You can pick a tablespace for storing the queue table and all its ancillary objects using the storage_clause
parameter in DBMS_AQADM.CREATE_QUEUE_TABLE.
However, once you pick the tablespace, any index-organized table (IOT) or index created for that queue table goes to the specified tablespace. Currently, you do not have a choice to split them between different tablespaces.
In 8.1 you can associate RAC instance affinities with queue tables. If you are using q1
and q2
in different instances, then you can use alter_queue_table
(or even create queue table) on the queue table and set the primary_instance
to the appropriate instance_id.
Yes, here is a simple rule that specifies message properties: rule = 'priority = 1'; here are example rules that specify a combination of message properties and data attributes: rule = 'priority = 1 AND tab.userdata.sal = 1000' rule = '((priority between 0 AND 3)
OR correlation = "BACK_ORDERS")
AND tab.userdata.customer_name
like ''JOHN DOE'')'
User data properties or attributes apply only to object payloads and must be prefixed with tab.userdata in all cases. Check documentation for more examples.
No. Registration is an OCI client call to be used for asynchronous notifications (that is, push). It provides a notification from the server to the client when a message is available for dequeue. A client side function (callback) is invoked by the server when the message is available. Registration for notification is both nonblocking and nonpolling.
To provide a mechanism for notification to all users that are currently connected. The nonpersistent queue mechanism supports the enqueue of a message to a nonpersistent queue and OCI notifications are used to deliver such messages to users that are currently registered for notification.
Yes, 1024 subscribers or recipients for any queue.
You can dequeue these messages by msgid.
You can find the msgid
by querying the queue table view. Eventually the messages are moved to the exception queue (you must have the Oracle Streams AQ Queue Monitor Process running for this to happen). You can dequeue these messages from the exception queue with a usual dequeue.
Only by dequeuing and enqueuing the message again. If you are changing the message payload, then it is a different message.
Notification is possible only to OCI clients. The client need not be connected to the database to receive notifications. The client specifies a callback function which is executed for each message. Asynchronous Notification cannot be used to invoke an executable, but it is possible for the callback function to invoke a stored procedure.
Propagation from a multiconsumer queue to a single consumer queue is possible. The reverse is not possible (propagation is not possible from a single consumer queue).
You are probably using the NEXT_MESSAGE
navigation option for dequeue. This uses the snapshot created during the first dequeue call. After that, undo information may not be retained.
The workaround is to use the FIRST_MESSAGE
option to dequeue the message. This reexecutes the cursor and gets a new snapshot. This might not perform as well, so we suggest you dequeue them in batches: FIRST_MESSAGE
for one, and NEXT_MESSAGE
for the next, say, 1000 messages, and then FIRST_MESSAGE
again, and so on.
No, Oracle Streams AQ does not provide this information. To get around this, the application could save this information in the message.
When the enq_time
is the same for messages, there is another field called step_no that is monotonically increasing (for each message that has the same enq_time
). Hence this helps in maintaining the order of the messages. There is no situation when both enq_time
and step_no
are the same for more than one message enqueued from the same session.
In Oracle9i and higher, OMB functionality is provided in Oracle Database. If you are using Oracle9i or higher database, then use the functionality offered by the database. You do not need OMB. Note also that from Oracle9i release 2 (9.2) Oracle Messaging Gateway (MGW) provides the OMB functionality.
With Oracle8i, use MGW in the following scenarios:
To integrate with Websphere MQ
To use HTTP framework
Use Java Message Service (JMS) functionality directly from the database in other scenarios.
Yes, you can specify a security policy with Oracle Streams AQ queue tables. While dequeuing, use the dequeue condition (deq_cond
) or the correlation ID for the policy to be applied. You can use "1=1" as the dequeue condition. If you do not use a dequeue condition or correlation ID, then the dequeue results in an error.
The Oracle Streams AQ retention feature can be used to automatically clean up messages after the user-specified duration after consumption.
You can do a dequeue with the subscriber name or by message ID. This consumes the messages, which are cleaned up after their retention time expires.
To clean up messages for a particular subscriber, you can remove the subscriber and add the subscriber again. Removing the subscriber removes all the messages for that subscriber.
Transformation of XML data can be accomplished in one of the following ways:
Using the extract()
method supported on XMLType
to return an object of XMLType
after applying the supplied XPath
expression
Creating a PL/SQL function that transforms the XMLType
object by applying an XSLT transformation to it, using the package XSLPROCESSOR