Oracle® Streams Advanced Queuing User's Guide and Reference Release 10.1 Part Number B10785-01 |
|
|
View PDF |
This chapter describes how to use and manage Oracle Streams AQ when staging and propagating. It describes SYS.AnyData
queues and user messages.
This chapter contains these topics:
Oracle Streams uses queues of type SYS.AnyData
to stage events. There are two types of events that can be staged in an Oracle Streams queue:
Logical change records (LCRs). LCRs are objects that contain information about a change to a database object.
User messages. These are custom messages created by users or applications.
Both types of events are of type SYS.AnyData
and can be used for information sharing within a single database or between databases.
Staged events can be consumed or propagated, or both. These events can be consumed by an apply process or by a user application that explicitly dequeues them. Even after an event is consumed, it can remain in the queue if you have also configured Oracle Streams to propagate the event to one or more other queues or if message retention is specified. These other queues can reside in the same database or in different databases. In either case, the queue from which the events are propagated is called the source queue, and the queue that receives the events is called the destination queue.
Oracle Streams enables messaging with queues of type SYS.AnyData
. SYS.AnyData
queues can stage user messages whose payloads are of SYS.AnyData
type. A SYS.AnyData
payload can be a wrapper for payloads of different datatypes. Queues that can stage messages of only a particular type are called typed queues.
By using SYS.AnyData
wrappers for message payloads, publishing applications can enqueue messages of different types into a single queue. Subscribing applications can then dequeue these messages, either explicitly using a dequeue API or implicitly using an apply process. If the subscribing application is remote, then the messages can be propagated to the remote site, and the subscribing application can dequeue the messages from a local queue in the remote database. Alternatively, a remote subscribing application can dequeue messages directly from the source queue using a variety of standard protocols, such as PL/SQL and Oracle Call Interface (OCI).
Oracle Streams interoperates with Oracle Streams AQ, which supports all the standard features of message queuing systems, including multiconsumer queues, publish and subscribe, content-based routing, internet propagation, transformations, and gateways to other messaging subsystems.
You can wrap almost any type of payload in a SYS.AnyData
payload. To do this, you use the Convert
data_type
static functions of the SYS.AnyData
type, where data_type
is the type of object to wrap. These functions take the object as input and return a SYS.AnyData
object.
The following datatypes cannot be wrapped in a SYS.AnyData
wrapper:
Nested table
ROWID and UROWID
The following datatypes can be directly wrapped in a SYS.AnyData
wrapper, but these datatypes cannot be present in a user-defined type payload wrapped in a SYS.AnyData
wrapper:
Your applications can use the following programmatic environments to enqueue user messages into a SYS.AnyData
queue and dequeue user messages from a SYS.AnyData
queue:
PL/SQL (DBMS_AQ
package)
Java Message Service (JMS)
OCI
The following sections provide information about using these interfaces to enqueue user messages into and dequeue user messages from a SYS.AnyData
queue.
See Also: Chapter 4, " Oracle Streams AQ: Programmatic Environments" for more information about these programmatic interfaces |
To enqueue a user message containing an LCR into a SYS.AnyData
queue using PL/SQL, first create the LCR to be enqueued. You use the constructor for the SYS.LCR$_ROW_RECORD
type to create a row LCR, and you use the constructor for the SYS.LCR$_DDL_RECORD
type to create a DDL LCR. Then you use the SYS.AnyData.ConvertObject
function to convert the LCR into SYS.AnyData
payload and enqueue it using the DBMS_AQ.ENQUEUE
procedure.
To enqueue a user message containing a non-LCR object into a SYS.AnyData
queue using PL/SQL, you use one of the SYS.AnyData.Convert*
functions to convert the object into SYS.AnyData
payload and enqueue it using the DBMS_AQ.ENQUEUE
procedure.
To enqueue a user message containing an LCR into a SYS.AnyData
queue using JMS or OCI, you must represent the LCR in XML format. To construct an LCR, use the oracle.xdb.XMLType
class. LCRs are defined in the SYS
schema. The LCR schema must be loaded into the SYS
schema using the catxlcr.sql
script in Oracle home in the rdbms/admin/
directory.
To enqueue a message using OCI, perform the same actions that you would to enqueue a message into a typed queue. A typed queue is a queue that can stage messages of a particular type only. To enqueue a message using JMS, a user must have EXECUTE
privilege on DBMS_AQ
, DBMS_AQIN
, and DBMS_AQJMS
packages.
Note: Enqueue of JMS types and XML types does not work with Oracle StreamsSys.Anydata queues unless you call DBMS_AQADM.ENABLE_JMS_TYPES( queue_table_name ) after DBMS_STREAMS_ADM.SET_UP_QUEUE() . Enabling an Oracle Streams queue for these types may affect import/export of the queue table. |
A non-LCR user message can be a message of any user-defined type or a JMS type. The JMS types include the following:
javax.jms.TextMessage
javax.jms.MapMessage
javax.jms.StreamMessage
javax.jms.ObjectMessage
javax.jms.BytesMessage
When using user-defined types, you must generate the Java class for the message using Jpublisher, which implements the ORAData
interface. To enqueue a message into a SYS.AnyData
queue, you can use methods QueueSender.send
or TopicPublisher.publish
.
See Also:
|
To dequeue a user message from SYS.AnyData
queue using PL/SQL, you use the DBMS_AQ.DEQUEUE
procedure and specify SYS.AnyData
as the payload. The user message can contain an LCR or another type of object.
In a SYS.AnyData
queue, user messages containing LCRs in XML format are represented as oracle.xdb.XMLType
. Non-LCR messages can be one of the following formats:
A JMS type (javax.jms.TextMessage
, javax.jms.MapMessage
, javax.jms.StreamMessage
, javax.jms.ObjectMessage
, or javax.jms.BytesMessage
)
A user-defined type
To dequeue a message from a SYS.AnyData
queue using JMS, you can use methods QueueReceiver
, TopicSubscriber
, or TopicReceiver
. Because the queue can contain different types of objects wrapped in a SYS.AnyData
wrapper, you must register a list of SQL types and their corresponding Java classes in the typemap of the JMS session. JMS types are already preregistered in the typemap.
For example, suppose a queue contains LCR messages represented as oracle.xdb.XMLType
and messages of type person
and address
. The classes JPerson.java
and JAddress.java
are the ORAData
mappings for person
and address
, respectively. Before dequeuing the message, the type map must be populated as follows:
java.util.Map map = ((AQjmsSession)q_sess).getTypeMap(); map.put("SCOTT.PERSON", Class.forName("JPerson")); map.put("SCOTT.ADDRESS", Class.forName("JAddress")); map.put("SYS.XMLTYPE", Class.forName("oracle.xdb.XMLType")); // For LCRs
When using message selectors with a QueueReceiver
or TopicPublisher
, the selector can contain any SQL92 expression that has a combination of one or more of the following:
JMS message header fields or properties, including JMSPriority
, JMSCorrelationID
, JMSType
, JMSXUserI
, JMSXAppID
, JMSXGroupID
, and JMSXGroupSeq
. The following is an example of a JMS message field:
JMSPriority < 3 AND JMSCorrelationID = 'Fiction'
User-defined message properties, as in the following example:
color IN ('RED', 'BLUE', 'GREEN') AND price < 30000
PL/SQL functions, as in the following example:
hr.GET_TYPE(tab.user_data) = 'HR.EMPLOYEES'
To dequeue a message using OCI, perform the same actions that you would to dequeue a message from a typed queue.
See Also:
|
SYS.AnyData
queues can interoperate with typed queues in an Oracle Streams environment. A typed queue can stage messages of a particular type only. Table 23-1 shows the types of propagation possible between queues.
Table 23-1 Propagation Between Different Types of Queues
Source Queue | Destination Queue | Transformation |
---|---|---|
SYS.AnyData |
SYS.AnyData |
None |
Typed | SYS.AnyData |
Implicit
Note: Propagation is possible only if the messages in the typed queue meet the restrictions outlined in "User-Defined Type Messages". |
SYS.AnyData |
Typed | Requires a rule to filter messages and a user-defined transformation |
Typed | Typed | Follows Oracle Streams AQ rules |
To propagate messages containing a payload of a certain type from a SYS.AnyData
source queue to a typed destination queue, you must perform a transformation. Only messages containing a payload of the same type as the typed queue can be propagated to the typed queue.
Although you cannot use Simple Object Access Protocol (SOAP) to interact directly with a SYS.AnyData
queue, you can use SOAP with Oracle Streams by propagating messages between a SYS.AnyData
queue and a typed queue. If you want to enqueue a message into a SYS.AnyData
queue using SOAP, then you can configure propagation from a typed queue to SYS.AnyData
queue. Then, you can use SOAP to enqueue a message into the typed queue. The message is propagated automatically from the typed queue to the SYS.AnyData
queue.
If you want to use SOAP to dequeue a message that is in a SYS.AnyData
queue, then you can configure propagation from a SYS.AnyData
queue to a typed queue. The message is propagated automatically from the SYS.AnyData
queue to the typed queue. Then, the message would be available for access using SOAP.
Note: Certain Oracle Streams capabilities, such as capturing changes using a capture process and applying changes with an apply process, can be configured only withSYS.AnyData queues. |
See Also: Oracle Streams Concepts and Administration, "Propagating Messages Between a SYS.AnyData Queue and a Typed Queue" |
If you plan to enqueue, propagate, or dequeue user-defined type messages in an Oracle Streams environment, then each type used in these messages must exist at every database where the message can be staged in a queue. Some environments use directed networks to route messages through intermediate databases before they reach their destination. In such environments, the type must exist at each intermediate database, even if the messages of this type are never enqueued or dequeued at a particular intermediate database.
In addition, the following requirements must be met for such types:
The type name must be the same at each database.
The type must be in the same schema at each database.
The shape of the type must match exactly at each database.
The type cannot use inheritance or type evolution at any database.
The type cannot contain varrays, nested tables, LOBs, rowids, or urowids.
The object identifier need not match at each database.
See Also:
|
Oracle Streams enables messaging with queues of type SYS.AnyData
. These queues stage user messages whose payloads are of SYS.AnyData
type, and a SYS.AnyData
payload can be a wrapper for payloads of different datatypes.
This section provides instructions for completing the following tasks:
Propagating Messages Between a SYS.AnyData Queue and a Typed Queue
Note: The examples in this section assume that you have configured an Oracle Streams administrator at each database. |
You can wrap almost any type of payload in a SYS.AnyData
payload. The following sections provide examples of enqueuing messages into, and dequeuing messages from, a SYS.AnyData
queue.
Example 23-1 Example of Wrapping a Payload in a SYS.AnyData Payload and Enqueuing It
The following steps illustrate how to wrap payloads of various types in a SYS.AnyData
payload.
Connect as an administrative user who can create users, grant privileges, create tablespaces, and alter users at the dbs1.net
database.
Grant EXECUTE
privilege on the DBMS_AQ
package to the oe
user so that this user can run the ENQUEUE
and DEQUEUE
procedures in that package:
GRANT EXECUTE ON DBMS_AQ TO oe;
Connect as the Oracle Streams administrator, as in the following example:
CONNECT strmadmin/strmadminpw@dbs1.net
Create a SYS.AnyData
queue if one does not already exist.
BEGIN DBMS_STREAMS_ADM.SET_UP_QUEUE( queue_table => 'oe_q_table_any', queue_name => 'oe_q_any', queue_user => 'oe'); END; /
The oe
user is configured automatically as a secure user of the oe_q_any
queue and is given ENQUEUE
and DEQUEUE
privileges on the queue.
Add a subscriber to the oe_q_any
queue. This subscriber performs explicit dequeues of events. The ADD_SUBSCRIBER
procedure will automatically create an AQ_AGENT
.
DECLARE subscriber SYS.AQ$_AGENT; BEGIN subscriber := SYS.AQ$_AGENT('LOCAL_AGENT', NULL, NULL); SYS.DBMS_AQADM.ADD_SUBSCRIBER( queue_name => 'strmadmin.oe_q_any', subscriber => subscriber); END; /
Grant the oe
user enqueue and dequeue privileges on queue strmadmin.oe_q_any
.
BEGIN DBMS_AQADM.GRANT_QUEUE_PRIVILEGE( privilege => ALL, queue_name => 'strmadmin.oe_q_any', grantee => 'oe'); END; /
Associate the oe
user with the local_agent
agent:
BEGIN DBMS_AQADM.ENABLE_DB_ACCESS( agent_name => 'local_agent', db_username => 'oe'); END; /
CONNECT oe/oe@dbs1.net
Create a procedure that takes as an input parameter an object of SYS.AnyData
type and enqueues a message containing the payload into an existing SYS.AnyData
queue.
CREATE OR REPLACE PROCEDURE oe.enq_proc (payload SYS.AnyData) IS enqopt DBMS_AQ.ENQUEUE_OPTIONS_T; mprop DBMS_AQ.MESSAGE_PROPERTIES_T; enq_msgid RAW(16); BEGIN mprop.SENDER_ID := SYS.AQ$_AGENT('LOCAL_AGENT', NULL, NULL); DBMS_AQ.ENQUEUE( queue_name => 'strmadmin.oe_q_any', enqueue_options => enqopt, message_properties => mprop, payload => payload, msgid => enq_msgid); END; /
Run the procedure you created in Step 9 by specifying the appropriate Convert
data_type
function. The following commands enqueue messages of various types.
VARCHAR2
type:
EXEC oe.enq_proc(SYS.AnyData.ConvertVarchar2('Chemicals - SW')); COMMIT;
NUMBER
type:
EXEC oe.enq_proc(SYS.AnyData.ConvertNumber('16')); COMMIT;
User-defined type:
BEGIN oe.enq_proc(SYS.AnyData.ConvertObject(oe.cust_address_typ( '1646 Brazil Blvd','361168','Chennai','Tam', 'IN'))); END; / COMMIT;
See Also: Oracle Streams Concepts and Administration, "Viewing the Contents of User-Enqueued Events in a Queue" for information about viewing the contents of these enqueued messages |
Example 23-2 Example of Dequeuing a Payload That Is Wrapped in a SYS.AnyData Payload
The following steps illustrate how to dequeue a payload wrapped in a SYS.AnyData
payload. This example assumes that you have completed the steps in "Example of Wrapping a Payload in a SYS.AnyData Payload and Enqueuing It".
To dequeue messages, you must know the consumer of the messages. To find the consumer for the messages in a queue, connect as the owner of the queue and query the AQ$
queue_table_name
, where queue_table_name
is the name of the queue table. For example, to find the consumers of the messages in the oe_q_any
queue, run the following query:
CONNECT strmadmin/strmadminpw@dbs1.net SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$OE_Q_TABLE_ANY;
Connect as the oe
user:
CONNECT oe/oe@dbs1.net
Create a procedure that takes as an input the consumer of the messages you want to dequeue. The following example procedure dequeues messages of oe.cust_address_typ
and prints the contents of the messages.
CREATE OR REPLACE PROCEDURE oe.get_cust_address ( consumer IN VARCHAR2) AS address OE.CUST_ADDRESS_TYP; deq_address SYS.AnyData; msgid RAW(16); deqopt DBMS_AQ.DEQUEUE_OPTIONS_T; mprop DBMS_AQ.MESSAGE_PROPERTIES_T; new_addresses BOOLEAN := TRUE; next_trans EXCEPTION; no_messages EXCEPTION; pragma exception_init (next_trans, -25235); pragma exception_init (no_messages, -25228); num_var pls_integer; BEGIN deqopt.consumer_name := consumer; deqopt.wait := 1; WHILE (new_addresses) LOOP BEGIN DBMS_AQ.DEQUEUE( queue_name => 'strmadmin.oe_q_any', dequeue_options => deqopt, message_properties => mprop, payload => deq_address, msgid => msgid); deqopt.navigation := DBMS_AQ.NEXT; DBMS_OUTPUT.PUT_LINE('****'); IF (deq_address.GetTypeName() = 'OE.CUST_ADDRESS_TYP') THEN DBMS_OUTPUT.PUT_LINE('Message TYPE is: ' || deq_address.GetTypeName()); num_var := deq_address.GetObject(address); DBMS_OUTPUT.PUT_LINE(' **** CUSTOMER ADDRESS **** '); DBMS_OUTPUT.PUT_LINE(address.street_address); DBMS_OUTPUT.PUT_LINE(address.postal_code); DBMS_OUTPUT.PUT_LINE(address.city); DBMS_OUTPUT.PUT_LINE(address.state_province); DBMS_OUTPUT.PUT_LINE(address.country_id); ELSE DBMS_OUTPUT.PUT_LINE('Message TYPE is: ' || deq_address.GetTypeName()); END IF; COMMIT; EXCEPTION WHEN next_trans THEN deqopt.navigation := DBMS_AQ.NEXT_TRANSACTION; WHEN no_messages THEN new_addresses := FALSE; DBMS_OUTPUT.PUT_LINE('No more messages'); END; END LOOP; END; /
Run the procedure you created in Step 1 and specify the consumer of the messages you want to dequeue, as in the following example:
SET SERVEROUTPUT ON SIZE 100000 EXEC oe.get_cust_address('LOCAL_AGENT');
SYS.AnyData
queues can interoperate with typed queues in an Oracle Streams environment. A typed queue is a queue that can stage messages of a particular type only. To propagate a message from a SYS.AnyData
queue to a typed queue, the message must be transformed to match the type of the typed queue. The following sections provide examples of propagating non-LCR user messages and LCRs between a SYS.AnyData
queue and a typed queue.
Note: The examples in this section assume that you have completed the examples in "SYS.AnyData Wrapper for User Messages Payloads". |
See Also: "Message Propagation and SYS.AnyData Queues" for more information about propagation betweenSYS.AnyData and typed queues |
Example 23-3 Example of Propagating Non-LCR User Messages to a Typed Queue
The following steps set up propagation from a SYS.AnyData
queue named oe_q_any
to a typed queue of type oe.cust_address_typ
named oe_q_address
. The source queue oe_q_any
is at the dbs1.net
database, and the destination queue oe_q_address
is at the dbs2.net
database. Both queues are owned by strmadmin
.
Connect as an administrative user who can grant privileges at dbs1.net
.
Grant the following privilege to strmadmin
, if it was not already granted.
GRANT EXECUTE ON DBMS_TRANSFORM TO strmadmin;
Grant strmadmin
EXECUTE
privilege on oe.cust_address_typ
at dbs1.net
and dbs2.net
.
CONNECT oe/oe@dbs1.net GRANT EXECUTE ON oe.cust_address_typ TO strmadmin; CONNECT oe/oe@dbs2.net GRANT EXECUTE ON oe.cust_address_typ TO strmadmin;
Create a typed queue at dbs2.net
, if one does not already exist.
CONNECT strmadmin/strmadminpw@dbs2.net BEGIN DBMS_AQADM.CREATE_QUEUE_TABLE( queue_table => 'strmadmin.oe_q_table_address', queue_payload_type => 'oe.cust_address_typ', multiple_consumers => true); DBMS_AQADM.CREATE_QUEUE( queue_name => 'strmadmin.oe_q_address', queue_table => 'strmadmin.oe_q_table_address'); DBMS_AQADM.START_QUEUE( queue_name => 'strmadmin.oe_q_address'); END; /
Create a database link between dbs1.net
and dbs2.net
if one does not already exist.
CONNECT strmadmin/strmadminpw@dbs1.net CREATE DATABASE LINK dbs2.net CONNECT TO strmadmin IDENTIFIED BY strmadminpw USING 'DBS2.NET';
Create a function called any_to_cust_address_typ
in the strmadmin
schema at dbs1.net
that takes a SYS.AnyData
payload containing a oe.cust_address_typ
object and returns the oe.cust_address_typ
object.
CREATE OR REPLACE FUNCTION strmadmin.any_to_cust_address_typ( in_any IN SYS.AnyData) RETURN OE.CUST_ADDRESS_TYP AS address OE.CUST_ADDRESS_TYP; num_var NUMBER; type_name VARCHAR2(100); BEGIN -- Get the type of object type_name := in_any.GetTypeName(); -- Check if the object type is OE.CUST_ADDRESS_TYP IF (type_name = 'OE.CUST_ADDRESS_TYP') THEN -- Put the address in the message into the address variable num_var := in_any.GetObject(address); RETURN address; ELSE raise_application_error(-20101, 'Conversion failed - ' || type_name); END IF; END; /
Create a transformation at dbs1.net
using the DBMS_TRANSFORM
package.
BEGIN DBMS_TRANSFORM.CREATE_TRANSFORMATION( schema => 'strmadmin', name => 'anytoaddress', from_schema => 'SYS', from_type => 'ANYDATA', to_schema => 'oe', to_type => 'cust_address_typ', transformation => 'strmadmin.any_to_cust_address_typ(source.user_data)'); END; /
Create a subscriber for the typed queue if one does not already exist. The subscriber must contain a rule that ensures that only messages of the appropriate type are propagated to the destination queue.
DECLARE subscriber SYS.AQ$_AGENT; BEGIN subscriber := SYS.AQ$_AGENT ('ADDRESS_AGENT_REMOTE', 'STRMADMIN.OE_Q_ADDRESS@DBS2.NET', 0); DBMS_AQADM.ADD_SUBSCRIBER( queue_name => 'strmadmin.oe_q_any', subscriber => subscriber, rule => 'TAB.USER_DATA.GetTypeName()=''OE.CUST_ADDRESS_TYP''', transformation => 'strmadmin.anytoaddress'); END; /
Schedule propagation between the SYS.AnyData
queue at dbs1.net
and the typed queue at dbs2.net
.
BEGIN DBMS_AQADM.SCHEDULE_PROPAGATION( queue_name => 'strmadmin.oe_q_any', destination => 'dbs2.net'); END; /
Enqueue a message of oe.cust_address_typ
type wrapped in a SYS.AnyData
wrapper:
CONNECT oe/oe@dbs1.net BEGIN oe.enq_proc(SYS.AnyData.ConvertObject(oe.cust_address_typ( '1668 Chong Tao','111181','Beijing',NULL, 'CN'))); END; / COMMIT;
After allowing some time for propagation, query the queue table at dbs2.net
to view the propagated message:
CONNECT strmadmin/strmadminpw@dbs2.net SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$OE_Q_TABLE_ADDRESS;
See Also: Chapter 21, " Oracle Messaging Gateway Message Conversion" for more information about transformations during propagation |
Example 23-4 Example of Propagating LCRs to a Typed Queue
To propagate LCRs from a SYS.AnyData
queue to a typed queue, you complete the same steps as you do for non-LCR events, but Oracle supplies the transformation functions. You can use the following functions in the DBMS_STREAMS
package to transform LCRs in SYS.AnyData
queues to messages in typed queues:
The CONVERT_ANYDATA_TO_LCR_ROW
function transforms SYS.AnyData
payload containing a row LCR into SYS.LCR$_ROW_RECORD
payload.
The CONVERT_ANYDATA_TO_LCR_DDL
function transforms SYS.AnyData
payload containing a DDL LCR into SYS.LCR$_DDL_RECORD
payload.
You can propagate user-enqueued LCRs to an appropriate typed queue, but propagation of captured LCRs to a typed queue is not supported.
The following example sets up propagation of row LCRs from a SYS.AnyData
queue named oe_q_any
to a typed queue of type SYS.LCR$_ROW_RECORD
named oe_q_lcr
. The source queue oe_q_any
is at the dbs1.net
database, and the destination queue oe_q_lcr
is at the dbs3.net
database.
Connect as an administrative user who can grant privileges at dbs1.net
.
Grant the following privilege to strmadmin
, if it was not already granted.
GRANT EXECUTE ON DBMS_TRANSFORM TO strmadmin;
Create a queue of the LCR type if one does not already exist.
CONNECT strmadmin/strmadminpw@dbs3.net BEGIN DBMS_AQADM.CREATE_QUEUE_TABLE( queue_table => 'strmadmin.oe_q_table_lcr', queue_payload_type => 'SYS.LCR$_ROW_RECORD', multiple_consumers => true); DBMS_AQADM.CREATE_QUEUE( queue_name => 'strmadmin.oe_q_lcr', queue_table => 'strmadmin.oe_q_table_lcr'); DBMS_AQADM.START_QUEUE( queue_name => 'strmadmin.oe_q_lcr'); END; /
Create a database link between dbs1.net
and dbs3.net
if one does not already exist.
CONNECT strmadmin/strmadminpw@dbs1.net CREATE DATABASE LINK dbs3.net CONNECT TO strmadmin IDENTIFIED BY strmadminpw USING 'DBS3.NET';
Create a transformation at dbs1.net
using the DBMS_TRANSFORM
package.
BEGIN DBMS_TRANSFORM.CREATE_TRANSFORMATION( schema => 'strmadmin', name => 'anytolcr', from_schema => 'SYS', from_type => 'ANYDATA', to_schema => 'SYS', to_type => 'LCR$_ROW_RECORD', transformation => 'SYS.DBMS_STREAMS.CONVERT_ANYDATA_TO_LCR_ROW(source.user_data)'); END; /
Create a subscriber at the typed queue if one does not already exist. The subscriber specifies the CONVERT_ANYDATA_TO_LCR_ROW
function for the transformation parameter.
DECLARE subscriber SYS.AQ$_AGENT; BEGIN subscriber := SYS.AQ$_AGENT ( 'ROW_LCR_AGENT_REMOTE', 'STRMADMIN.OE_Q_LCR@DBS3.NET', 0); DBMS_AQADM.ADD_SUBSCRIBER( queue_name => 'strmadmin.oe_q_any', subscriber => subscriber, rule => 'TAB.USER_DATA.GetTypeName()=''SYS.LCR$_ROW_RECORD''', transformation => 'strmadmin.anytolcr'); END; /
Schedule propagation between the SYS.AnyData
queue at dbs1.net
and the LCR queue at dbs3.net
.
BEGIN DBMS_AQADM.SCHEDULE_PROPAGATION( queue_name => 'strmadmin.oe_q_any', destination => 'dbs3.net'); END; /
Create a procedure to construct and enqueue a row LCR into the strmadmin.oe_q_any
queue:
CONNECT oe/oe@dbs1.net CREATE OR REPLACE PROCEDURE oe.enq_row_lcr_proc( source_dbname VARCHAR2, cmd_type VARCHAR2, obj_owner VARCHAR2, obj_name VARCHAR2, old_vals SYS.LCR$_ROW_LIST, new_vals SYS.LCR$_ROW_LIST) AS eopt DBMS_AQ.ENQUEUE_OPTIONS_T; mprop DBMS_AQ.MESSAGE_PROPERTIES_T; enq_msgid RAW(16); row_lcr SYS.LCR$_ROW_RECORD; BEGIN mprop.SENDER_ID := SYS.AQ$_AGENT('LOCAL_AGENT', NULL, NULL); -- Construct the LCR based on information passed to procedure row_lcr := SYS.LCR$_ROW_RECORD.CONSTRUCT( source_database_name => source_dbname, command_type => cmd_type, object_owner => obj_owner, object_name => obj_name, old_values => old_vals, new_values => new_vals); -- Enqueue the created row LCR DBMS_AQ.ENQUEUE( queue_name => 'strmadmin.oe_q_any', enqueue_options => eopt, message_properties => mprop, payload => SYS.AnyData.ConvertObject(row_lcr), msgid => enq_msgid); END enq_row_lcr_proc; /
Create a row LCR that inserts a row into the oe.inventories
table and enqueue the row LCR into the strmadmin.oe_q_any
queue.
DECLARE newunit1 SYS.LCR$_ROW_UNIT; newunit2 SYS.LCR$_ROW_UNIT; newunit3 SYS.LCR$_ROW_UNIT; newvals SYS.LCR$_ROW_LIST; BEGIN newunit1 := SYS.LCR$_ROW_UNIT( 'PRODUCT_ID', SYS.AnyData.ConvertNumber(3503), DBMS_LCR.NOT_A_LOB, NULL, NULL); newunit2 := SYS.LCR$_ROW_UNIT( 'WAREHOUSE_ID', SYS.AnyData.ConvertNumber(1), DBMS_LCR.NOT_A_LOB, NULL, NULL); newunit3 := SYS.LCR$_ROW_UNIT( 'QUANTITY_ON_HAND', SYS.AnyData.ConvertNumber(157), DBMS_LCR.NOT_A_LOB, NULL, NULL); newvals := SYS.LCR$_ROW_LIST(newunit1,newunit2,newunit3); oe.enq_row_lcr_proc( source_dbname => 'DBS1.NET', cmd_type => 'INSERT', obj_owner => 'OE', obj_name => 'INVENTORIES', old_vals => NULL, new_vals => newvals); END; / COMMIT;
After allowing some time for propagation, query the queue table at dbs3.net
to view the propagated message:
CONNECT strmadmin/strmadminpw@dbs3.net SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$OE_Q_TABLE_LCR;
See Also: "DBMS_STREAMS" in PL/SQL Packages and Types Reference for more information about the row LCR and DDL LCR conversion functions |