Playing a stream


To play a stream file on the 3DO system, an application has to perform the following actions:

The following sections look at each of these activities in some detail, using examples from the sample code provided on the Toolkit 1.5 CD-ROM.

Creating a message port and a message

The different threads that are part of a data streaming application communicate using messages. The application therefore has to create a message port and a message.This section first looks at sample code for creating a message port and a message, then discusses using messages in some more detail.

About messages

In a multitasking system, the different tasks must communicate to each other when they are processing data to make sure two threads don't attempt to work with the same data simultaneously. The 3DO DataStreamer uses messages for this kind of locking because they offer queueing in addition to locking.

Using messages is discussed in some detail in the 3DO Portfolio System Programmer's Guide.

Example for setting up a message port

Example 1 below is taken from PlayCPakStream.c, which is part of the NuPlayer application that plays back a Cinepak stream. Note how both setup and cleanup have to be handled.

Example 1: Creating and removing a message port and a message.

/* InitCPakPlayerFromStreamHeader() creates a message port and one 
    * message item to communicate with the data streamer.
        */
    ctx->messagePort = NewMsgPort( NULL);
    if ( ctx->messagePort < 0 )
        goto CLEANUP;


    ctx->messageItem = CreateMsgItem( ctx->messagePort );
    if ( ctx->messageItem < 0 )
        goto CLEANUP;


    ctx->endOfStreamMessageItem = 
            CreateMsgItem( ctx->messagePort );
    if ( ctx->endOfStreamMessageItem < 0 )
        goto CLEANUP;

/*Here's the code from DismantlePlayer(), 
    * which is called by CLEANUP, 
    * that removes the items created above. 
    */
    if ( ctx->messageItem )                RemoveMsgItem( ctx->messageItem );
    if ( ctx->endOfStreamMessageItem )
        RemoveMsgItem( ctx->endOfStreamMessageItem );
    if ( ctx->messagePort )                RemoveMsgPort( ctx->messagePort );

The message-sending mechanism

The streamer thread uses messages to communicate with the data acquisition thread and subscriber thread. A common message header is used as a preamble to all messages sent by or sent to the streamer thread. The values of the message header communicate the status of the thread that sends it to the thread that receives it. The message header is defined in DataStream.h as follows:

Example 2: Message header.

/* The following preamble is used for all types of messages sent
 * by the streamer.   
 */
#define     DS_MSG_HEADER            \
    long     whatToDo;                    /* opcode determining msg contents */         
    Item     msgItem;                    /* msg item for sending this buffer */    
    void *    privatePtr;                    /* ptr to sender's private data */                    
    void *    link;                    /* user-defined, for linking into lists*/

The following table shows the fields in the common message header.

Table 1:  Message header fields.
--------------------------------------------------------
Field       |Description                                
--------------------------------------------------------
whatToDo    |Specifies the type of request being made by
            |the message sender. Its value typically    
            |appears in a switch() statement and is used
            |to access various union fields in the      
            |remainder of the message structure.        
--------------------------------------------------------
msgItem     |System item number that sends message data 
            |to a receiver. It is also used by message  
            |receiver when calling ReplyMsg() to signal 
            |that the requested operation is complete.  
--------------------------------------------------------
privatePtr  |Stores a pointer to a private data         
            |structure that performs post-processing    
            |after the receiver calls ReplyMsg().       
--------------------------------------------------------
link        |General-purpose linking field to form      
            |queues of messages received from the       
            |streamer thread. Other threads can use it  
            |to queue messages.                         
--------------------------------------------------------

Using ReplyMsg()

When a data acquisition thread or subscriber thread completes a process, it must inform the library using the system routine ReplyMsg(). This is necessary because such completions can trigger other actions. For example, when the data acquisition thread replies to a request to fill a buffer, the reply adds the filled buffer to a list of filled buffers that will subsequently be parsed and their data distributed to the appropriate subscriber thread.

When a subscriber thread replies to a message that announces the availability of a new data chunk, the reply signals that the subscriber has finished using that data. Once all subscribers using data in a buffer have signaled that they are finished with their portions of the buffer, the buffer can be reused.

Allocating data buffers

This section looks at sample code that creates data buffers used by an application using the DataStreamer. It also briefly looks at some heuristics for deciding on buffer size.

Example for creating a buffer list

Example 3 below from PrepareStream.c which is part of the NuPlayer application creates a buffer list. A program that wants to use NuPlayer can specify the number of stream buffers and the streamblock size directly, or extract it from the stream header of the stream file. The stream header was written based on information in the Weaver script. This is illustrated in the following example.

Example 3: Creating a buffer list.

/***********************************************************************************
 * Routine to allocate and initialize a buffer list for use with the streamer
***********************************************************************************/
DSDataBufPtr     CreateBufferList( long numBuffers, long bufferSize )
    {
#    define            ROUND_TO_LONG(x)    ((x + 3) & ~3)
    DSDataBufPtr     bp;
    DSDataBufPtr     firstBp;
    long            totalBufferSpace;
    long            actualEntrySize;
    long            bufferNum;
    
    actualEntrySize = sizeof(DSDataBuf) + ROUND_TO_LONG( bufferSize );
    totalBufferSpace = numBuffers * actualEntrySize;

    /* Try to allocate the needed memory */
    firstBp = (DSDataBufPtr) NewPtr( totalBufferSpace, MEMTYPE_ANY );
    if ( firstBp == NULL )
        return NULL;

    /* Loop to take the big buffer and split it into "N" buffers
     * of size "bufferSize", linked together.
     */
    for ( bp = firstBp, bufferNum = 0; bufferNum < (numBuffers - 1); bufferNum++ )
        {
        bp->next            = (DSDataBufPtr) ( ((char *) bp) + actualEntrySize );
        bp->permanentNext     = bp->next;

        /* Advance to point to the next entry */
        bp = bp->next;
        }

    /* Make last entry's forward link point to nil */
    bp->next = NULL;
    bp->permanentNext = NULL;

    /* Return a pointer to the first buffer in the list 
     */
    return firstBp;
    }

Example 4: Using stream header information with CreateBufferList().

/* Allocate the stream buffers and build the linked list of
     * buffers that are input to the streamer.
     */
    ctx->bufferList = 
        CreateBufferList( ctx->hdr.streamBuffers, ctx->hdr.streamBlockSize );
    if ( ctx->bufferList == NULL )
        return kPSMemFullErr;

Deciding on buffer size

Each buffer you allocate must be large enough to hold one streamblock. How large the buffer should be depends on what your application does:

For more information

See Debugging and Optimization

Launching the required threads

Before a program can use the DataStreamer, it has to initialize the data acquisition, streamer, and subscriber threads, discussed in detail in DataStreamer threads and data flow. This section looks at some sample code for initializing the threads.

Initializing and connecting data acquisition and streamer threads

Initialization of the data acquisition and streamer threads consists of three steps:

  1. Call InitDataAcq() and InitDataStreaming() to initialize the memory pools for the threads.

  2. Call NewDataAcq() and NewDataStream() to create the specific thread. The prototype illustrates which information you have to specify:

    int32 NewDataStream (DSStreamCBPtr *pCtx, void* bufferListPtr, long bufferSize, long deltaPriority, long numSubsMsgs)
    int32 NewDataAcq (AcqContextPtr *pCtx, char* fileName, long deltaPriority)

    When the threads are created, information about them, such as buffer size or streamblock size can either be supplied directly or be extracted from the stream header, as in the example below.

  3. Finally, call DSConnect() to connect the streamer and the data acquisition thread.
Example 5: Initializing data acquisition and streamer threads.

/* Initialize data acquisition for use with 1 file */
    status = InitDataAcq( 1 );
    if ( status != 0 )
        goto CLEANUP;

/* Initialize data acquisition for the specified file */
    status = NewDataAcq( &ctx->acqContext, streamFileName,
                                 ctx->hdr.dataAcqDeltaPri );
    if ( status != 0 )
        goto CLEANUP;

    /* Initialize for 1 streamer thread */
    status = InitDataStreaming( 1 );
    if ( status != 0 )
        goto CLEANUP;

status = NewDataStream( &ctx->streamCBPtr,         /* output: stream control block ptr */
                    ctx->bufferList,                                 /* pointer to buffer list */
                    ctx->hdr.streamBlockSize,                                 /* size of each buffer */
                    ctx->hdr.streamerDeltaPri,                                /* streamer thread rel. priority */
                    ctx->hdr.numSubsMsgs );                                /* number of subscriber messages */
    if ( status != 0 )
        goto CLEANUP;

/* Connect the stream to its data supplier */
    status = DSConnect( ctx->messageItem, NULL, ctx->streamCBPtr, 
                            ctx->acqContext->requestPort );
    CHECK_DS_RESULT( "DSConnect()", status );
    if ( status != 0 )
        goto CLEANUP;


Initializing and connecting subscriber threads

Example 6 shows how to initialize and set up subscriber threads. The program that wants to use the DataStreamer has to loop through all subscriber in the subscriber list found in the header of the stream file. That list is generated by the Weaver based on the subscribers used in the Weaver script. For each subscriber, the program:

Example 6 provides an example from the NuPlayer application for launching a Cinepack subscriber.

Example 6: Preparing subscriber threads.

/* Loop through the subscriber descriptor table and initialize all subscribers      */

    for ( subscriberIndex = 0; 
        ctx->hdr.subscriberList[ subscriberIndex ].subscriberType != 0;
        subscriberIndex++ )
        {
        subsPtr = ctx->hdr.subscriberList + subscriberIndex;
        
        switch ( subsPtr->subscriberType )
            {
            case FILM_CHUNK_TYPE:
                status = InitCPakSubscriber();
                CHECK_DS_RESULT( "InitCPakSubscriber", status );
                if ( status != 0 )
                    goto CLEANUP;

                status = NewCPakSubscriber( &ctx->cpakContextPtr, 1, 
                            subsPtr->deltaPriority );
                CHECK_DS_RESULT( "NewCPakSubscriber", status );
                if ( status != 0 )
                    goto CLEANUP;

                status = DSSubscribe( ctx->messageItem,                                                    // control msg item
                            NULL,                                        // synchronous call
                            ctx->streamCBPtr,                                     // stream context block
                            (DSDataType) FILM_CHUNK_TYPE,                                     // subscriber data type
                            ctx->cpakContextPtr->requestPort );    
                                                                            // subscriber message port
                CHECK_DS_RESULT( "DSSubscribe for cinepak", status );
                if ( status != 0 )
                    goto CLEANUP;

                status = InitCPakCel( ctx->streamCBPtr,        // Needed for DSGetClock in sub
                            ctx->cpakContextPtr,                                // The subscriber's context
                            &ctx->cpakChannelPtr,                                // The channel's context
                            0,                                // The channel number
                            true );                                // true = flush on DS synch msg 
                CHECK_DS_RESULT( "InitCPakCel", status );
                if ( status != 0 )
                    goto CLEANUP;
                break;