Creating and initializing subscribers


This section discusses:

Note: The following examples document a skeleton subscriber. An example is in the Protosubscriber folder in the Streaming folder.

Initializing a subscriber

Typically a subscriber allocates any global resources it needs, such as subscriber context blocks, during initialization.

The example below shows the initialization procedure from Protosubscriber.c.

Example 1: Initializing a subscriber.

/***********************************************************************************
 * Routine to initialize the subscriber. Creates the a memory pool for allocating
 * subscriber contexts. A context is allocated and a new thread started with
 * calls to NewProtoSubscriber().
***********************************************************************************/
long InitProtoSubscriber( void )
    {
    ADD_PROTO_TRACE_L1( ProtoTraceBufPtr, kTraceInitSubscriber, 0, 0, 0 );

    /* Create the memory pool for allocating subscriber
     * contexts.
     */
    ProtoGlobals.contextPool = CreateMemPool( PR_SUBS_MAX_SUBSCRIPTIONS, 
                                                        sizeof(ProtoContext) );
    if ( ProtoGlobals.contextPool == NULL )
        return kDSNoMemErr;

    /* Return success */
    return kDSNoErr;
    }
int32 InitTestSubscriber( void );

Each subscriber requires different initialization actions. If you create a new subscriber, you should, however, use the initialization paradigm in Example 2 so that other developers familiar with the standard subscribers will find it easier to use it.

In addition to the initialization routine, you also need to provide a routine to deallocate things allocated by one-time initialization in the InitProtoSubscriber() routine. In the Protosubscriber example application, this is done by the function CloseProtoSubscriber().

Creating the subscriber thread

How a new subscriber is created depends on the subscriber. In general, the program has to perform the following actions (see Example 2):

Example 2: Creating a new subscriber.

/**********************************************************************************
 * Routine to instantiate a new subscriber. Creates the subscriber thread and passes 
 * the new context (allocated from the global pool) to it. Creates the message port 
 * through which all subsequent communications between the subscriber and the 
 * streamer take place, as well as any other necessary per-context resources. 
***********************************************************************************/
long NewProtoSubscriber( ProtoContextPtr *pCtx, DSStreamCBPtr streamCBPtr, 
                                long deltaPriority )
    {
    long status;
    ProtoContextPtr ctx;
    ulong signalBits;

    ADD_PROTO_TRACE_L1( ProtoTraceBufPtr, kTraceNewSubscriber, 0, 0, 0 );

    /* Allocate a subscriber context */
    ctx = (ProtoContextPtr) AllocPoolMem( ProtoGlobals.contextPool );
    if ( ctx == NULL )
        return kDSNoMemErr;

    /* Allocate a signal to synchronize with the completion of the subscriber's
     *  initialization. It will signal us with this when it has finished,
     *  successfully or not, when it is done initializing itself./ */
    ctx->creatorTask         = CURRENTTASK->t.n_Item;    /* see "kernel.h" for this */
    ctx->streamCBPtr         = streamCBPtr;

    ctx->creatorSignal = AllocSignal( 0 );
    if ( ctx->creatorSignal == 0 )
        {
        status = kDSNoSignalErr;
        goto CLEANUP;
        }

    /* Create the thread that will handle all subscriber responsibilities./*/
    status = NewThread( 
                (void *) &ProtoSubscriberThread,                                             /* thread entrypoint */
                4096,                                             /* initial stack size */
                (long) CURRENT_TASK_PRIORITY + deltaPriority,    /* priority */
                NULL,                                                 /* name */
                &ctx->threadStackBlock,                         /* where to remember stack block addr */
                0,                                                 /* initial R0 */
                ctx );                                            /* initial R1 */

    if ( status <= 0 )
        goto CLEANUP;
    else
        ctx->threadItem = status;

    /* Wait here while the subscriber initializes itself. When its done,look at the
     * status returned to us in the context block to determine if it was happy.
     */
    signalBits = WaitSignal( ctx->creatorSignal );
    if ( signalBits != ctx->creatorSignal )
        return kDSSignalErr;

    /* We're done with this signal, so give it back */
    FreeSignal( ctx->creatorSignal );

    /* Check the initialization status of the subscriber. If anything
     * failed, the 'ctx->creatorStatus' field will be set to a system result
     * code. If this is >= 0 then initialization was successful.
     */
    status = ctx->creatorStatus;
    if ( status >= 0 )
        {
        *pCtx = ctx;        /* give the caller a copy of the context pointer */
        return status;        /* return 'success' */
        }

Disposing of all allocated resources

In addition to an initialization routine, you also need one that gets rid of all subscriber resources that were created. Deleting the subscriber thread should free up all other resources the thread allocated. What is left is the thread's stack space, allocated during initialization, and the context block. In the Protosubscriber example, this is handled by the DisposeProtoSubscriber() routine.