view libmpdemux/demux_rtp.cpp @ 11619:179138947307

This patch contains bugfixes for the esd audio output driver that I uncovered while trying to send sound to a remote esd server over a wireless (11 mbs, just enough to handle to sound) link. First, the sound was full "ticking" sounds. I found a bug that prevented the "send the remainder of this block" code from ever being called - so large chunks of audio were simply being ignored. Fixing this bug removed the "ticking" from audio streams. Fixing this bug, however, uncovered another problem - when the socket buffer was full, doing a blocking write to finish the buffer would take far too long and would turn video into a chunky mess. I'd imagine this blocking write would be fine for an audio-only stream, but it turns out to hold up the video far too much. The solution in this patch is to write as much data as possible to the socket, and then return as soon as possible, reporting the number of bytes actually written accurately back to mplayer. I've tested it on both local and remote esd servers, and it works well. Patch by Benjamin Osheroff <ben@gimbo.net>
author attila
date Wed, 10 Dec 2003 12:19:13 +0000
parents 384f6a88a31d
children a79a318585ce
line wrap: on
line source

////////// Routines (with C-linkage) that interface between "MPlayer"
////////// and the "LIVE.COM Streaming Media" libraries:

extern "C" {
#include "demux_rtp.h"
#include "stheader.h"
}
#include "demux_rtp_internal.h"

#include "BasicUsageEnvironment.hh"
#include "liveMedia.hh"
#include "GroupsockHelper.hh"
#include <unistd.h>

extern "C" stream_t* stream_open_sdp(int fd, off_t fileSize,
				     int* file_format) {
  *file_format = DEMUXER_TYPE_RTP;
  stream_t* newStream = NULL;
  do {
    char* sdpDescription = (char*)malloc(fileSize+1);
    if (sdpDescription == NULL) break;

    ssize_t numBytesRead = read(fd, sdpDescription, fileSize);
    if (numBytesRead != fileSize) break;
    sdpDescription[fileSize] = '\0'; // to be safe

    newStream = (stream_t*)calloc(sizeof (stream_t), 1);
    if (newStream == NULL) break;

    // Store the SDP description in the 'priv' field, for later use:
    newStream->priv = sdpDescription; 
  } while (0);
  return newStream;
}

extern "C" int _rtsp_streaming_seek(int /*fd*/, off_t /*pos*/,
				    streaming_ctrl_t* /*streaming_ctrl*/) {
  return -1; // For now, we don't handle RTSP stream seeking
}

extern "C" int rtsp_streaming_start(stream_t* stream) {
  stream->streaming_ctrl->streaming_seek = _rtsp_streaming_seek;

  return 0;
}

// A data structure representing input data for each stream:
class ReadBufferQueue {
public:
  ReadBufferQueue(MediaSubsession* subsession, demuxer_t* demuxer,
		  char const* tag);
  virtual ~ReadBufferQueue();

  FramedSource* readSource() const { return fReadSource; }
  RTPSource* rtpSource() const { return fRTPSource; }
  demuxer_t* ourDemuxer() const { return fOurDemuxer; }
  char const* tag() const { return fTag; }

  char blockingFlag; // used to implement synchronous reads

  // For A/V synchronization:
  Boolean prevPacketWasSynchronized;
  float prevPacketPTS;
  ReadBufferQueue** otherQueue;

  // The 'queue' actually consists of just a single "demux_packet_t"
  // (because the underlying OS does the actual queueing/buffering):
  demux_packet_t* dp;

  // However, we sometimes inspect buffers before delivering them.
  // For this, we maintain a queue of pending buffers:
  void savePendingBuffer(demux_packet_t* dp);
  demux_packet_t* getPendingBuffer();

private:
  demux_packet_t* pendingDPHead;
  demux_packet_t* pendingDPTail;

  FramedSource* fReadSource;
  RTPSource* fRTPSource;
  demuxer_t* fOurDemuxer;
  char const* fTag; // used for debugging
};

// A structure of RTP-specific state, kept so that we can cleanly
// reclaim it:
typedef struct RTPState {
  char const* sdpDescription;
  RTSPClient* rtspClient;
  SIPClient* sipClient;
  MediaSession* mediaSession;
  ReadBufferQueue* audioBufferQueue;
  ReadBufferQueue* videoBufferQueue;
  unsigned flags;
  struct timeval firstSyncTime;
};

extern "C" char* network_username;
extern "C" char* network_password;
static char* openURL_rtsp(RTSPClient* client, char const* url) {
  // If we were given a user name (and optional password), then use them: 
  if (network_username != NULL) {
    char const* password = network_password == NULL ? "" : network_password;
    return client->describeWithPassword(url, network_username, password);
  } else {
    return client->describeURL(url);
  }
}

static char* openURL_sip(SIPClient* client, char const* url) {
  // If we were given a user name (and optional password), then use them: 
  if (network_username != NULL) {
    char const* password = network_password == NULL ? "" : network_password;
    return client->inviteWithPassword(url, network_username, password);
  } else {
    return client->invite(url);
  }
}

int rtspStreamOverTCP = 0; 

extern "C" void demux_open_rtp(demuxer_t* demuxer) {
  do {
    TaskScheduler* scheduler = BasicTaskScheduler::createNew();
    if (scheduler == NULL) break;
    UsageEnvironment* env = BasicUsageEnvironment::createNew(*scheduler);
    if (env == NULL) break;

    RTSPClient* rtspClient = NULL;
    SIPClient* sipClient = NULL;

    if (demuxer == NULL || demuxer->stream == NULL) break;  // shouldn't happen
    demuxer->stream->eof = 0; // just in case 

    // Look at the stream's 'priv' field to see if we were initiated
    // via a SDP description:
    char* sdpDescription = (char*)(demuxer->stream->priv);
    if (sdpDescription == NULL) {
      // We weren't given a SDP description directly, so assume that
      // we were given a RTSP or SIP URL:
      char const* protocol = demuxer->stream->streaming_ctrl->url->protocol;
      char const* url = demuxer->stream->streaming_ctrl->url->url;
      extern int verbose;
      if (strcmp(protocol, "rtsp") == 0) {
	rtspClient = RTSPClient::createNew(*env, verbose, "MPlayer");
	if (rtspClient == NULL) {
	  fprintf(stderr, "Failed to create RTSP client: %s\n",
		  env->getResultMsg());
	  break;
	}
	sdpDescription = openURL_rtsp(rtspClient, url);
      } else { // SIP
	unsigned char desiredAudioType = 0; // PCMU (use 3 for GSM)
	sipClient = SIPClient::createNew(*env, desiredAudioType, NULL,
					 verbose, "MPlayer");
	if (sipClient == NULL) {
	  fprintf(stderr, "Failed to create SIP client: %s\n",
		  env->getResultMsg());
	  break;
	}
	sipClient->setClientStartPortNum(8000);
	sdpDescription = openURL_sip(sipClient, url);
      }

      if (sdpDescription == NULL) {
	fprintf(stderr, "Failed to get a SDP description from URL \"%s\": %s\n",
		url, env->getResultMsg());
	break;
      }
    }

    // Now that we have a SDP description, create a MediaSession from it:
    MediaSession* mediaSession = MediaSession::createNew(*env, sdpDescription);
    if (mediaSession == NULL) break;


    // Create a 'RTPState' structure containing the state that we just created,
    // and store it in the demuxer's 'priv' field, for future reference:
    RTPState* rtpState = new RTPState;
    rtpState->sdpDescription = sdpDescription;
    rtpState->rtspClient = rtspClient;
    rtpState->sipClient = sipClient;
    rtpState->mediaSession = mediaSession;
    rtpState->audioBufferQueue = rtpState->videoBufferQueue = NULL;
    rtpState->flags = 0;
    rtpState->firstSyncTime.tv_sec = rtpState->firstSyncTime.tv_usec = 0;
    demuxer->priv = rtpState;

    // Create RTP receivers (sources) for each subsession:
    MediaSubsessionIterator iter(*mediaSession);
    MediaSubsession* subsession;
    unsigned desiredReceiveBufferSize;
    while ((subsession = iter.next()) != NULL) {
      // Ignore any subsession that's not audio or video:
      if (strcmp(subsession->mediumName(), "audio") == 0) {
	desiredReceiveBufferSize = 100000;
      } else if (strcmp(subsession->mediumName(), "video") == 0) {
	desiredReceiveBufferSize = 2000000;
      } else {
	continue;
      }

      if (!subsession->initiate()) {
	fprintf(stderr, "Failed to initiate \"%s/%s\" RTP subsession: %s\n", subsession->mediumName(), subsession->codecName(), env->getResultMsg());
      } else {
	fprintf(stderr, "Initiated \"%s/%s\" RTP subsession\n", subsession->mediumName(), subsession->codecName());

	// Set the OS's socket receive buffer sufficiently large to avoid
	// incoming packets getting dropped between successive reads from this
	// subsession's demuxer.  Depending on the bitrate(s) that you expect,
	// you may wish to tweak the "desiredReceiveBufferSize" values above.
	int rtpSocketNum = subsession->rtpSource()->RTPgs()->socketNum();
	int receiveBufferSize
	  = increaseReceiveBufferTo(*env, rtpSocketNum,
				    desiredReceiveBufferSize);
	if (verbose > 0) {
	  fprintf(stderr, "Increased %s socket receive buffer to %d bytes \n",
		  subsession->mediumName(), receiveBufferSize);
	}

	if (rtspClient != NULL) {
	  // Issue a RTSP "SETUP" command on the chosen subsession:
	  if (!rtspClient->setupMediaSubsession(*subsession, False,
						rtspStreamOverTCP)) break;
	}
      }
    }

    if (rtspClient != NULL) {
      // Issue a RTSP aggregate "PLAY" command on the whole session:
      if (!rtspClient->playMediaSession(*mediaSession)) break;
    } else if (sipClient != NULL) {
      sipClient->sendACK(); // to start the stream flowing
    }

    // Now that the session is ready to be read, do additional
    // MPlayer codec-specific initialization on each subsession:
    iter.reset();
    while ((subsession = iter.next()) != NULL) {
      if (subsession->readSource() == NULL) continue; // not reading this

      unsigned flags = 0;
      if (strcmp(subsession->mediumName(), "audio") == 0) {
	rtpState->audioBufferQueue
	  = new ReadBufferQueue(subsession, demuxer, "audio");
	rtpState->audioBufferQueue->otherQueue = &(rtpState->videoBufferQueue);
	rtpCodecInitialize_audio(demuxer, subsession, flags);
      } else if (strcmp(subsession->mediumName(), "video") == 0) {
	rtpState->videoBufferQueue
	  = new ReadBufferQueue(subsession, demuxer, "video");
	rtpState->videoBufferQueue->otherQueue = &(rtpState->audioBufferQueue);
	rtpCodecInitialize_video(demuxer, subsession, flags);
      }
      rtpState->flags |= flags;
    }
  } while (0);
}

extern "C" int demux_is_mpeg_rtp_stream(demuxer_t* demuxer) {
  // Get the RTP state that was stored in the demuxer's 'priv' field:
  RTPState* rtpState = (RTPState*)(demuxer->priv);

  return (rtpState->flags&RTPSTATE_IS_MPEG12_VIDEO) != 0;
}

extern "C" int demux_is_multiplexed_rtp_stream(demuxer_t* demuxer) {
  // Get the RTP state that was stored in the demuxer's 'priv' field:
  RTPState* rtpState = (RTPState*)(demuxer->priv);

  return (rtpState->flags&RTPSTATE_IS_MULTIPLEXED) != 0;
}

static demux_packet_t* getBuffer(demuxer_t* demuxer, demux_stream_t* ds,
				 Boolean mustGetNewData,
				 float& ptsBehind); // forward

extern "C" int demux_rtp_fill_buffer(demuxer_t* demuxer, demux_stream_t* ds) {
  // Get a filled-in "demux_packet" from the RTP source, and deliver it.
  // Note that this is called as a synchronous read operation, so it needs
  // to block in the (hopefully infrequent) case where no packet is
  // immediately available.

  while (1) {
    float ptsBehind;
    demux_packet_t* dp = getBuffer(demuxer, ds, False, ptsBehind); // blocking
    if (dp == NULL) return 0;

    if (demuxer->stream->eof) return 0; // source stream has closed down
  
    // Before using this packet, check to make sure that its presentation
    // time is not far behind the other stream (if any).  If it is,
    // then we discard this packet, and get another instead.  (The rest of
    // MPlayer doesn't always do a good job of synchronizing when the
    // audio and video streams get this far apart.)
    // (We don't do this when streaming over TCP, because then the audio and
    // video streams are interleaved.)
    const float ptsBehindThreshold = 1.0; // seconds
    if (ptsBehind < ptsBehindThreshold || rtspStreamOverTCP) { // packet's OK
      ds_add_packet(ds, dp);
      break;
    }
    
#ifdef DEBUG_PRINT_DISCARDED_PACKETS
    RTPState* rtpState = (RTPState*)(demuxer->priv);
    ReadBufferQueue* bufferQueue = ds == demuxer->video ? rtpState->videoBufferQueue : rtpState->audioBufferQueue;
    fprintf(stderr, "Discarding %s packet (%fs behind)\n", bufferQueue->tag(), ptsBehind);
#endif
    free_demux_packet(dp); // give back this packet, and get another one
  }

  return 1;
}

Boolean awaitRTPPacket(demuxer_t* demuxer, demux_stream_t* ds,
		       unsigned char*& packetData, unsigned& packetDataLen,
		       float& pts) {
  // Similar to "demux_rtp_fill_buffer()", except that the "demux_packet"
  // is not delivered to the "demux_stream".
  float ptsBehind;
  demux_packet_t* dp = getBuffer(demuxer, ds, True, ptsBehind); // blocking
  if (dp == NULL) return False;

  packetData = dp->buffer;
  packetDataLen = dp->len;
  pts = dp->pts;

  return True;
}

Boolean insertRTPData(demuxer_t* demuxer, demux_stream_t* ds,
		      unsigned char* data, unsigned dataLen) {
  // Begin by finding the buffer queue that we want to add data to.
  // (Get this from the RTP state, which we stored in
  //  the demuxer's 'priv' field)
  RTPState* rtpState = (RTPState*)(demuxer->priv);
  ReadBufferQueue* bufferQueue = NULL;
  if (ds == demuxer->video) {
    bufferQueue = rtpState->videoBufferQueue;
  } else if (ds == demuxer->audio) {
    bufferQueue = rtpState->audioBufferQueue;
  } else {
    fprintf(stderr, "(demux_rtp)insertRTPData: internal error: unknown stream\n");
    return False;
  }

  if (data == NULL || dataLen == 0) return False;

  demux_packet_t* dp = new_demux_packet(dataLen);
  if (dp == NULL) return False;

  // Copy our data into the buffer, and save it:
  memmove(dp->buffer, data, dataLen);
  dp->len = dataLen;
  dp->pts = 0;
  bufferQueue->savePendingBuffer(dp);
  return True;
}

static void teardownRTSPorSIPSession(RTPState* rtpState); // forward

extern "C" void demux_close_rtp(demuxer_t* demuxer) {
  // Reclaim all RTP-related state:

  // Get the RTP state that was stored in the demuxer's 'priv' field:
  RTPState* rtpState = (RTPState*)(demuxer->priv);
  if (rtpState == NULL) return;

  teardownRTSPorSIPSession(rtpState);

  UsageEnvironment* env = NULL;
  TaskScheduler* scheduler = NULL;
  if (rtpState->mediaSession != NULL) {
    env = &(rtpState->mediaSession->envir());
    scheduler = &(env->taskScheduler());
  }
  Medium::close(rtpState->mediaSession);
  Medium::close(rtpState->rtspClient);
  Medium::close(rtpState->sipClient);
  delete rtpState->audioBufferQueue;
  delete rtpState->videoBufferQueue;
  delete rtpState->sdpDescription;
  delete rtpState;

  delete env; delete scheduler;
}

////////// Extra routines that help implement the above interface functions:

#define MAX_RTP_FRAME_SIZE 50000
    // >= the largest conceivable frame composed from one or more RTP packets

static void afterReading(void* clientData, unsigned frameSize,
			 struct timeval presentationTime) {
  if (frameSize >= MAX_RTP_FRAME_SIZE) {
    fprintf(stderr, "Saw an input frame too large (>=%d).  Increase MAX_RTP_FRAME_SIZE in \"demux_rtp.cpp\".\n",
	    MAX_RTP_FRAME_SIZE);
  }
  ReadBufferQueue* bufferQueue = (ReadBufferQueue*)clientData;
  demuxer_t* demuxer = bufferQueue->ourDemuxer();
  RTPState* rtpState = (RTPState*)(demuxer->priv);

  if (frameSize > 0) demuxer->stream->eof = 0;

  demux_packet_t* dp = bufferQueue->dp;
  dp->len = frameSize;

  // Set the packet's presentation time stamp, depending on whether or
  // not our RTP source's timestamps have been synchronized yet: 
  Boolean hasBeenSynchronized
    = bufferQueue->rtpSource()->hasBeenSynchronizedUsingRTCP();
  if (hasBeenSynchronized) {
    if (verbose > 0 && !bufferQueue->prevPacketWasSynchronized) {
      fprintf(stderr, "%s stream has been synchronized using RTCP \n",
	      bufferQueue->tag());
    }

    struct timeval* fst = &(rtpState->firstSyncTime); // abbrev
    if (fst->tv_sec == 0 && fst->tv_usec == 0) {
      *fst = presentationTime;
    }
    
    // For the "pts" field, use the time differential from the first
    // synchronized time, rather than absolute time, in order to avoid
    // round-off errors when converting to a float:
    dp->pts = presentationTime.tv_sec - fst->tv_sec
      + (presentationTime.tv_usec - fst->tv_usec)/1000000.0;
    bufferQueue->prevPacketPTS = dp->pts;
  } else {
    if (verbose > 0 && bufferQueue->prevPacketWasSynchronized) {
      fprintf(stderr, "%s stream is no longer RTCP-synchronized \n",
	      bufferQueue->tag());
    }

    // use the previous packet's "pts" once again:
    dp->pts = bufferQueue->prevPacketPTS;
  }
  bufferQueue->prevPacketWasSynchronized = hasBeenSynchronized;

  dp->pos = demuxer->filepos;
  demuxer->filepos += frameSize;

  // Signal any pending 'doEventLoop()' call on this queue:
  bufferQueue->blockingFlag = ~0;
}

static void onSourceClosure(void* clientData) {
  ReadBufferQueue* bufferQueue = (ReadBufferQueue*)clientData;
  demuxer_t* demuxer = bufferQueue->ourDemuxer();

  demuxer->stream->eof = 1;

  // Signal any pending 'doEventLoop()' call on this queue:
  bufferQueue->blockingFlag = ~0;
}

static demux_packet_t* getBuffer(demuxer_t* demuxer, demux_stream_t* ds,
				 Boolean mustGetNewData,
				 float& ptsBehind) {
  // Begin by finding the buffer queue that we want to read from:
  // (Get this from the RTP state, which we stored in
  //  the demuxer's 'priv' field)
  RTPState* rtpState = (RTPState*)(demuxer->priv);
  ReadBufferQueue* bufferQueue = NULL;
  if (ds == demuxer->video) {
    bufferQueue = rtpState->videoBufferQueue;
  } else if (ds == demuxer->audio) {
    bufferQueue = rtpState->audioBufferQueue;
  } else {
    fprintf(stderr, "(demux_rtp)getBuffer: internal error: unknown stream\n");
    return NULL;
  }

  if (bufferQueue == NULL || bufferQueue->readSource() == NULL) {
    fprintf(stderr, "(demux_rtp)getBuffer failed: no appropriate RTP subsession has been set up\n");
    return NULL;
  }
  
  demux_packet_t* dp;
  if (!mustGetNewData) {
    // Check whether we have a previously-saved buffer that we can use:
    dp = bufferQueue->getPendingBuffer();
    if (dp != NULL) {
      ptsBehind = 0.0; // so that we always accept this data
      return dp;
    }
  }

  // Allocate a new packet buffer, and arrange to read into it:
  dp = new_demux_packet(MAX_RTP_FRAME_SIZE);
  bufferQueue->dp = dp;
  if (dp == NULL) return NULL;

  // Schedule the read operation:
  bufferQueue->blockingFlag = 0;
  bufferQueue->readSource()->getNextFrame(dp->buffer, MAX_RTP_FRAME_SIZE,
					  afterReading, bufferQueue,
					  onSourceClosure, bufferQueue);
  // Block ourselves until data becomes available:
  TaskScheduler& scheduler
    = bufferQueue->readSource()->envir().taskScheduler();
  scheduler.doEventLoop(&bufferQueue->blockingFlag);

  // Set the "ptsBehind" result parameter:
  if (bufferQueue->prevPacketPTS != 0.0
      && bufferQueue->prevPacketWasSynchronized
      && *(bufferQueue->otherQueue) != NULL
      && (*(bufferQueue->otherQueue))->prevPacketPTS != 0.0
      && (*(bufferQueue->otherQueue))->prevPacketWasSynchronized) {
    ptsBehind = (*(bufferQueue->otherQueue))->prevPacketPTS
		 - bufferQueue->prevPacketPTS;
  } else {
    ptsBehind = 0.0;
  }

  if (mustGetNewData) {
    // Save this buffer for future reads:
    bufferQueue->savePendingBuffer(dp);
  }

  return dp;
}

static void teardownRTSPorSIPSession(RTPState* rtpState) {
  MediaSession* mediaSession = rtpState->mediaSession;
  if (mediaSession == NULL) return;
  if (rtpState->rtspClient != NULL) {
    MediaSubsessionIterator iter(*mediaSession);
    MediaSubsession* subsession;

    while ((subsession = iter.next()) != NULL) {
      rtpState->rtspClient->teardownMediaSubsession(*subsession);
    }
  } else if (rtpState->sipClient != NULL) {
    rtpState->sipClient->sendBYE();
  }
}

////////// "ReadBuffer" and "ReadBufferQueue" implementation:

ReadBufferQueue::ReadBufferQueue(MediaSubsession* subsession,
				 demuxer_t* demuxer, char const* tag)
  : prevPacketWasSynchronized(False), prevPacketPTS(0.0), otherQueue(NULL),
    dp(NULL), pendingDPHead(NULL), pendingDPTail(NULL),
    fReadSource(subsession == NULL ? NULL : subsession->readSource()),
    fRTPSource(subsession == NULL ? NULL : subsession->rtpSource()),
    fOurDemuxer(demuxer), fTag(strdup(tag)) {
} 

ReadBufferQueue::~ReadBufferQueue() {
  delete fTag;

  // Free any pending buffers (that never got delivered):
  demux_packet_t* dp = pendingDPHead;
  while (dp != NULL) {
    demux_packet_t* dpNext = dp->next;
    dp->next = NULL;
    free_demux_packet(dp);
    dp = dpNext;
  }
}

void ReadBufferQueue::savePendingBuffer(demux_packet_t* dp) {
  // Keep this buffer around, until MPlayer asks for it later:
  if (pendingDPTail == NULL) {
    pendingDPHead = pendingDPTail = dp;
  } else {
    pendingDPTail->next = dp;
    pendingDPTail = dp;
  }
  dp->next = NULL;
}

demux_packet_t* ReadBufferQueue::getPendingBuffer() {
  demux_packet_t* dp = pendingDPHead;
  if (dp != NULL) {
    pendingDPHead = dp->next;
    if (pendingDPHead == NULL) pendingDPTail = NULL; 

    dp->next = NULL;
  }

  return dp;
}