Using the Publish-Subscribe Model for Applications


Because the database is the most significant resource of information within the enterprise, Oracle created a publish-subscribe solution for enterprise information delivery and messaging to complement this role. Topics in this chapter include:

Introduction to Publish-Subscribe

Networking technologies and products now enable a high degree of connectivity across a large number of computers, applications, and users. In these environments, it is important to provide asynchronous communications for the class of distributed systems that operate in a loosely-coupled and autonomous fashion, and which require operational immunity from network failures. This requirement has been filled by various middleware products that are characterized as messaging, message oriented middleware (MOM), message queuing, or publish-subscribe.

Applications that communicate through a publish and subscribe paradigm require the sending applications (publishers) to publish messages without explicitly specifying recipients or having knowledge of intended recipients. Similarly, receiving applications (subscribers) must receive only those messages that the subscriber has registered an interest in.

This decoupling between senders and recipients is usually accomplished by an intervening entity between the publisher and the subscriber, which serves as a level of indirection. This intervening entity is a queue that represents a subject or channel. Figure 17-1 illustrates publish and subscribe functionality.

Figure 17-1 Oracle Publish-Subscribe Functionality

Text description of adg81065.gif follows
Text description of the illustration adg81065.gif

A subscriber subscribes to a queue by expressing interest in messages enqueued to that queue and by using a subject- or content-based rule as a filter. This results in a set of rule-based subscriptions associated with a given queue.

At runtime, publishers post messages to various queues. The queue (in other words, the delivery mechanisms of the underlying infrastructure) then delivers messages that match the various subscriptions to the appropriate subscribers.

Publish-Subscribe Architecture

Oracle includes the following features to support database-enabled publish-subscribe messaging:

Database Events

Database events support declarative definitions for publishing database events, detection, and run-time publication of such events. This feature enables active publication of information to end-users in an event-driven manner, to complement the traditional pull-oriented approaches to accessing information.

Advanced Queuing

Oracle Advanced Queuing supports a queue-based publish-subscribe paradigm. Database queues serve as a durable store for messages, along with capabilities to allow publish and subscribe based on queues. A rules-engine and subscription service dynamically route messages to recipients based on expressed interest. This allows decoupling of addressing between senders and receivers to complement the existing explicit sender-receiver message addressing.

Client Notifications

Client notifications support asynchronous delivery of messages to interested subscribers. This enables database clients to register interest in certain queues, and it enables these clients to receive notifications when publications on such queues occur. Asynchronous delivery of messages to database clients is in contrast to the traditional polling techniques used to retrieve information.

Publish-Subscribe Concepts

This section describes various concepts related to publish-subscribe.


A queue is an entity that supports the notion of named subjects of interest. Queues can be characterized as:

non-persistent queue (lightweight queue)

The underlying queue infrastructure pushes the messages published to connected clients in a lightweight, at-best-once, manner.

persistent queue

Queues serve as durable containers for messages. Messages are delivered in a deferred and reliable mode.


Publishers and subscribers are internally represented as agents. There is a distinction between an agent and a client.

An agent is a persistent logical subscribing entity that expresses interest in a queue through a subscription. An agent has properties, such as an associated subscription, an address, and a delivery mode for messages. In this context, an agent is an electronic proxy for a publisher or subscriber.

A client is a transient physical entity. The attributes of a client include the physical process where the client programs run, the node name, and the client application logic. There could be several clients acting on behalf of a single agent. Also, the same client, if authorized, can act on behalf of multiple agents.

rule on a queue

A rule on a queue is specified as a conditional expression using a predefined set of operators on the message format attributes or on the message header attributes. Each queue has an associated message content format that describes the structure of the messages represented by that queue. The message format may be unstructured (RAW) or it may have a well-defined structure (ADT). This allows both subject- or content-based subscriptions.


Subscribers (agents) may specify subscriptions on a queue using a rule. Subscribers are durable and are stored in a catalog.

database event publication framework

The database represents a significant source for publishing information. An event framework is proposed to allow declarative definition of database event publication. As these pre-defined events occur, the framework detects and publishes such events. This allows active delivery of information to end-users in an event-driven manner as part of the publish-subscribe capability.


Registration is the process of associated delivery information by a given client, acting on behalf of an agent. There is an important distinction between the subscription and registration related to the agent/client separation.

Subscription indicates an interest in a particular queue by an agent. It does not specify where and how delivery must occur. Delivery information is a physical property that is associated with a client, and it is a transient manifestation of the logical agent (the subscriber). A specific client process acting on behalf of an agent registers delivery information by associating a host and port, indicating where the delivery should be done, and a callback, indicating how there delivery should be done.

publishing a message

Publishers publish messages to queues by using the appropriate queuing interfaces. The interfaces may depend on which model the queue is implemented on. For example, an enqueue call represents the publishing of a message.

rules engine

When a message is posted or published to a given queue, a rules engine extracts the set of candidate rules from all rules defined on that queue that match the published message.

subscription services

Corresponding to the list of candidate rules on a given queue, the set of subscribers that match the candidate rules can be evaluated. In turn, the set of agents corresponding to this subscription list can be determined and notified.


The queue notifies all registered clients of the appropriate published messages. This concept is called posting. When the queue needs to notify all interested clients, it posts the message to all registered clients.

receive a message

A subscriber may receive messages through any of the following mechanisms:

  • A client process acting on behalf of the subscriber specifies a callback using the registration mechanism. The posting mechanism then asynchronously invokes the callback when a message matches the subscriber’s subscription. The message content may be passed to the callback function (non-persistent queues only).
  • A client process acting on behalf of the subscriber specifies a callback using the registration mechanism. The posting mechanism then asynchronously invokes the callback function, but without the full message content. This serves as a notification to the client, which subsequently retrieves the message content in a pull fashion (persistent queues only).
  • A client process acting on behalf of the subscriber simply retrieves messages from the queue in a periodic, or some other appropriate, manner. While the messages are deferred, there is no asynchronous delivery to the end-client.

Examples of a Publish-Subscribe Mechanism


You may need to set up data structures, similar to the following, for certain examples to work:

CONNECT system/manager
CONNECT pubsub/pubsub

Scenario: This example shows how system events, client notification, and AQ work together to implement publish-subscribe.

  • Create under the user schema, pubsub, with all objects necessary to support a publish-subscribe mechanism. In this particular code, the Agent snoop subscribe to messages that are published at logon events. Note that the user pubsub needs AQ_ADMINISTRATOR_ROLE privileges to use AQ functionalities.
  • Rem ——————————————————
    REM create queue table for persistent multiple consumers:
    Rem ——————————————————

    CONNECT pubsub/pubsub;

    Rem Create or replace a queue table
    Queue_table => ‘Pubsub.Raw_msg_table’,
    Multiple_consumers => TRUE,
    Queue_payload_type => ‘RAW’,
    Compatible => ‘8.1’);
    Rem ——————————————————
    Rem Create a persistent queue for publishing messages:
    Rem ——————————————————

    Rem Create a queue for logon events
    Queue_name => ‘Pubsub.Logon’,
    Queue_table => ‘Pubsub.Raw_msg_table’,
    Comment => ‘Q for error triggers’);

    Rem ——————————————————
    Rem Start the queue:
    Rem ——————————————————


    Rem ——————————————————
    Rem define new_enqueue for convenience:
    Rem ——————————————————

    Queue_name IN VARCHAR2,
    Payload IN RAW ,
    Correlation IN VARCHAR2 := NULL,
    Exception_queue IN VARCHAR2 := NULL)

    Enq_ct DBMS_AQ.Enqueue_options_t;
    Msg_prop DBMS_AQ.Message_properties_t;
    Enq_msgid RAW(16);
    Userdata RAW(1000);

    Msg_prop.Exception_queue := Exception_queue;
    Msg_prop.Correlation := Correlation;
    Userdata := Payload;

    DBMS_AQ.ENQUEUE(Queue_name, Enq_ct, Msg_prop, Userdata, Enq_msgid);

    Rem ——————————————————
    Rem add subscriber with rule based on current user name,
    Rem using correlation_id
    Rem ——————————————————

    Subscriber Sys.Aq$_agent;
    Subscriber :=$_agent(‘SNOOP’, NULL, NULL);
    Queue_name => ‘Pubsub.logon’,
    Subscriber => subscriber,
    Rule => ‘CORRID = ”SCOTT” ‘);

    Rem ——————————————————
    Rem create a trigger on logon on database:
    Rem ——————————————————

    Rem create trigger on after logon:
    New_enqueue(‘Pubsub.Logon’, HEXTORAW(‘9999’), Dbms_standard.login_user);

  • After subscriptions are created, the next step is for the client to register for notification using callback functions. This is done using the Oracle Call Interface (OCI). The code below performs necessary steps for registration. The initial steps of allocating and initializing session handles are omitted here for sake of clarity.

    ub4 namespace = OCI_SUBSCR_NAMESPACE_AQ;

    /* callback function for notification of logon of user ‘scott’ on database: */

    ub4 notifySnoop(ctx, subscrhp, pay, payl, desc, mode)
    dvoid *ctx;
    OCISubscription *subscrhp;
    dvoid *pay;
    ub4 payl;
    dvoid *desc;
    ub4 mode;
    printf(“Notification : User Scott Logged onn”);

    int main()
    OCISession *authp = (OCISession *) 0;
    OCISubscription *subscrhpSnoop = (OCISubscription *)0;

    Initialize OCI Process/Environment
    Initialize Server Contexts
    Connect to Server
    Set Service Context

    /* Registration Code Begins */
    /* Each call to initSubscriptionHn allocates
    and Initialises a Registration Handle */

    initSubscriptionHn( &subscrhpSnoop, /* subscription handle */
    “ADMIN:PUBSUB.SNOOP”, /* subscription name */
    /* : */
    (dvoid*)notifySnoop); /* callback function */

    The Client Process does not need a live Session for Callbacks
    End Session and Detach from Server

    OCISessionEnd ( svchp, errhp, authp, (ub4) OCI_DEFAULT);

    /* detach from server */
    OCIServerDetach( srvhp, errhp, OCI_DEFAULT);

    while (1) /* wait for callback */


    void initSubscriptionHn (subscrhp,

    OCISubscription **subscrhp;
    char* subscriptionName;
    dvoid * func;

    /* allocate subscription handle: */

    (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)subscrhp,
    (size_t) 0, (dvoid **) 0);

    /* set subscription name in handle: */

    (void) OCIAttrSet((dvoid *) *subscrhp, (ub4) OCI_HTYPE_SUBSCRIPTION,
    (dvoid *) subscriptionName,
    (ub4) strlen((char *)subscriptionName),
    (ub4) OCI_ATTR_SUBSCR_NAME, errhp);

    /* set callback function in handle: */

    (void) OCIAttrSet((dvoid *) *subscrhp, (ub4) OCI_HTYPE_SUBSCRIPTION,
    (dvoid *) func, (ub4) 0,
    (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp);

    (void) OCIAttrSet((dvoid *) *subscrhp, (ub4) OCI_HTYPE_SUBSCRIPTION,
    (dvoid *) 0, (ub4) 0,
    (ub4) OCI_ATTR_SUBSCR_CTX, errhp);

    /* set namespace in handle: */

    (void) OCIAttrSet((dvoid *) *subscrhp, (ub4) OCI_HTYPE_SUBSCRIPTION,
    (dvoid *) &namespace, (ub4) 0,
    (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp);

    checkerr(errhp, OCISubscriptionRegister(svchp, subscrhp, 1, errhp,


Now, if user SCOTT logged on to the database, the client is notified, and the call back function notifySnoop is called.


Leave a Reply

Your email address will not be published. Required fields are marked *