asyncPacketQueue.h
Engine/source/platform/async/asyncPacketQueue.h
Time-based packet streaming.
Classes:
class
Time-based packet stream queue.
class
Information about the time slice covered by an individual packet currently on the queue.
Detailed Description
Time-based packet streaming.
The classes contained in this file can be used for any kind of continuous playback that depends on discrete samplings of a source stream (i.e. any kind of digital media streaming).
1 2//----------------------------------------------------------------------------- 3// Copyright (c) 2012 GarageGames, LLC 4// 5// Permission is hereby granted, free of charge, to any person obtaining a copy 6// of this software and associated documentation files (the "Software"), to 7// deal in the Software without restriction, including without limitation the 8// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 9// sell copies of the Software, and to permit persons to whom the Software is 10// furnished to do so, subject to the following conditions: 11// 12// The above copyright notice and this permission notice shall be included in 13// all copies or substantial portions of the Software. 14// 15// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 20// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 21// IN THE SOFTWARE. 22//----------------------------------------------------------------------------- 23 24#ifndef _ASYNCPACKETQUEUE_H_ 25#define _ASYNCPACKETQUEUE_H_ 26 27#ifndef _TFIXEDSIZEQUEUE_H_ 28#include "core/util/tFixedSizeDeque.h" 29#endif 30 31#ifndef _TSTREAM_H_ 32#include "core/stream/tStream.h" 33#endif 34 35#ifndef _TYPETRAITS_H_ 36#include "platform/typetraits.h" 37#endif 38 39 40//#define DEBUG_SPEW 41 42 43/// @file 44/// Time-based packet streaming. 45/// 46/// The classes contained in this file can be used for any kind 47/// of continuous playback that depends on discrete samplings of 48/// a source stream (i.e. any kind of digital media streaming). 49 50 51 52//-------------------------------------------------------------------------- 53// Async packet queue. 54//-------------------------------------------------------------------------- 55 56/// Time-based packet stream queue. 57/// 58/// AsyncPacketQueue writes data packets to a consumer stream in sync to 59/// a tick time source. Outdated packets may optionally be dropped automatically 60/// by the queue. A fixed maximum number of packets can reside in the queue 61/// concurrently at any one time. 62/// 63/// Be aware that using single item queues for synchronizing to a timer 64/// will usually result in bad timing behavior when packet uploading takes 65/// any non-trivial amount of time. 66/// 67/// @note While the queue associates a variable tick count with each 68/// individual packet, the queue fill status is measured in number of 69/// packets rather than in total tick time. 70/// 71/// @param Packet Value type of packets passed through this queue. 72/// @param TimeSource Value type for time tick source to which the queue 73/// is synchronized. 74/// @param Consumer Value type of stream to which the packets are written. 75/// 76template< typename Packet, typename TimeSource = IPositionable< U32 >*, typename Consumer = IOutputStream< Packet >*, typename Tick = U32 > 77class AsyncPacketQueue 78{ 79 public: 80 81 typedef void Parent; 82 83 /// The type of data packets being streamed through this queue. 84 typedef typename TypeTraits< Packet >::BaseType PacketType; 85 86 /// The type of consumer that receives the packets from this queue. 87 typedef typename TypeTraits< Consumer >::BaseType ConsumerType; 88 89 /// 90 typedef typename TypeTraits< TimeSource >::BaseType TimeSourceType; 91 92 /// Type for counting ticks. 93 typedef Tick TickType; 94 95 protected: 96 97 /// Information about the time slice covered by an 98 /// individual packet currently on the queue. 99 struct QueuedPacket 100 { 101 /// First tick contained in this packet. 102 TickType mStartTick; 103 104 /// First tick *not* contained in this packet anymore. 105 TickType mEndTick; 106 107 QueuedPacket( TickType start, TickType end ) 108 : mStartTick( start ), mEndTick( end ) {} 109 110 /// Return the total number of ticks in this packet. 111 TickType getNumTicks() const 112 { 113 return ( mEndTick - mStartTick ); 114 } 115 }; 116 117 typedef FixedSizeDeque< QueuedPacket> PacketQueue; 118 119 /// If true, packets that have missed their proper queuing timeframe 120 /// will be dropped. If false, they will be queued nonetheless. 121 bool mDropPackets; 122 123 /// Total number of ticks spanned by the total queue playback time. 124 /// If this is zero, the total queue time is considered to be infinite. 125 TickType mTotalTicks; 126 127 /// Total number of ticks submitted to the queue so far. 128 TickType mTotalQueuedTicks; 129 130 /// Queue that holds records for each packet currently in the queue. New packets 131 /// are added to back. 132 PacketQueue mPacketQueue; 133 134 /// The time source to which we are sync'ing. 135 TimeSource mTimeSource; 136 137 /// The output stream that this queue feeds into. 138 Consumer mConsumer; 139 140 /// Total number of packets queued so far. 141 U32 mTotalQueuedPackets; 142 143 public: 144 145 /// Construct an AsyncPacketQueue of the given length. 146 /// 147 /// @param maxQueuedPackets The length of the queue in packets. Only a maximum of 148 /// 'maxQueuedPackets' packets can be concurrently in the queue at any one time. 149 /// @param timeSource The tick time source to which the queue synchronizes. 150 /// @param consumer The output stream that receives the packets in sync to timeSource. 151 /// @param totalTicks The total number of ticks that will be played back through the 152 /// queue; if 0, the length is considered indefinite. 153 /// @param dropPackets Whether the queue should drop outdated packets; if dropped, a 154 /// packet will not reach the consumer. 155 AsyncPacketQueue( U32 maxQueuedPackets, 156 TimeSource timeSource, 157 Consumer consumer, 158 TickType totalTicks = 0, 159 bool dropPackets = false ) 160 : mDropPackets( dropPackets ), 161 mTotalTicks( totalTicks ), 162 mTotalQueuedTicks( 0 ), 163 mPacketQueue( maxQueuedPackets ), 164 mTimeSource( timeSource ), 165 mConsumer( consumer ) 166 167 { 168 #ifdef TORQUE_DEBUG 169 mTotalQueuedPackets = 0; 170 #endif 171 } 172 173 /// Return true if there are currently 174 bool isEmpty() const { return mPacketQueue.isEmpty(); } 175 176 /// Return true if all packets have been streamed. 177 bool isAtEnd() const; 178 179 /// Return true if the queue needs one or more new packets to be submitted. 180 bool needPacket(); 181 182 /// Submit a data packet to the queue. 183 /// 184 /// @param packet The data packet. 185 /// @param packetTicks The duration of the packet in ticks. 186 /// @param isLast If true, the packet is the last one in the stream. 187 /// @param packetPos The absolute position of the packet in the stream; if this is not supplied 188 /// the packet is assumed to immediately follow the preceding packet. 189 /// 190 /// @return true if the packet has been queued or false if it has been dropped. 191 bool submitPacket( Packet packet, 192 TickType packetTicks, 193 bool isLast = false, 194 TickType packetPos = TypeTraits< TickType >::MAX ); 195 196 /// Return the current playback position according to the time source. 197 TickType getCurrentTick() const { return Deref( mTimeSource ).getPosition(); } 198 199 /// Return the total number of ticks that have been queued so far. 200 TickType getTotalQueuedTicks() const { return mTotalQueuedTicks; } 201 202 /// Return the total number of packets that have been queued so far. 203 U32 getTotalQueuedPackets() const { return mTotalQueuedPackets; } 204}; 205 206template< typename Packet, typename TimeSource, typename Consumer, typename Tick > 207inline bool AsyncPacketQueue< Packet, TimeSource, Consumer, Tick >::isAtEnd() const 208{ 209 // Never at end if infinite. 210 211 if( !mTotalTicks ) 212 return false; 213 214 // Otherwise, we're at end if we're past the total tick count. 215 216 return ( getCurrentTick() >= mTotalTicks 217 && ( mDropPackets || mTotalQueuedTicks >= mTotalTicks ) ); 218} 219 220template< typename Packet, typename TimeSource, typename Consumer, typename Tick > 221bool AsyncPacketQueue< Packet, TimeSource, Consumer, Tick >::needPacket() 222{ 223 // Never need more packets once we have reached the 224 // end. 225 226 if( isAtEnd() ) 227 return false; 228 229 // Always needs packets while the queue is not 230 // filled up completely. 231 232 if( mPacketQueue.capacity() != 0 ) 233 return true; 234 235 // Unqueue packets that have expired their playtime. 236 237 TickType currentTick = getCurrentTick(); 238 while( mPacketQueue.size() && currentTick >= mPacketQueue.front().mEndTick ) 239 { 240 #ifdef DEBUG_SPEW 241 Platform::outputDebugString( "[AsyncPacketQueue] expired packet #%i: %i-%i (tick: %i; queue: %i)", 242 mTotalQueuedPackets - mPacketQueue.size(), 243 U32( mPacketQueue.front().mStartTick ), 244 U32( mPacketQueue.front().mEndTick ), 245 U32( currentTick ), 246 mPacketQueue.size() ); 247 #endif 248 249 mPacketQueue.popFront(); 250 } 251 252 // Need more packets if the queue isn't full anymore. 253 254 return ( mPacketQueue.capacity() != 0 ); 255} 256 257template< typename Packet, typename TimeSource, typename Consumer, typename Tick > 258bool AsyncPacketQueue< Packet, TimeSource, Consumer, Tick >::submitPacket( Packet packet, TickType packetTicks, bool isLast, TickType packetPos ) 259{ 260 AssertFatal( mPacketQueue.capacity() != 0, 261 "AsyncPacketQueue::submitPacket() - Queue is full!" ); 262 263 TickType packetStartPos; 264 TickType packetEndPos; 265 266 if( packetPos != TypeTraits< TickType >::MAX ) 267 { 268 packetStartPos = packetPos; 269 packetEndPos = packetPos + packetTicks; 270 } 271 else 272 { 273 packetStartPos = mTotalQueuedTicks; 274 packetEndPos = mTotalQueuedTicks + packetTicks; 275 } 276 277 // Check whether the packet is outdated, if enabled. 278 279 bool dropPacket = false; 280 if( mDropPackets ) 281 { 282 TickType currentTick = getCurrentTick(); 283 if( currentTick >= packetEndPos ) 284 dropPacket = true; 285 } 286 287 #ifdef DEBUG_SPEW 288 Platform::outputDebugString( "[AsyncPacketQueue] new packet #%i: %i-%i (ticks: %i, current: %i, queue: %i)%s", 289 mTotalQueuedPackets, 290 U32( mTotalQueuedTicks ), 291 U32( packetEndPos ), 292 U32( packetTicks ), 293 U32( getCurrentTick() ), 294 mPacketQueue.size(), 295 dropPacket ? " !! DROPPED !!" : "" ); 296 #endif 297 298 // Queue the packet. 299 300 if( !dropPacket ) 301 { 302 mPacketQueue.pushBack( QueuedPacket( packetStartPos, packetEndPos ) ); 303 Deref( mConsumer ).write( &packet, 1 ); 304 } 305 306 mTotalQueuedTicks = packetEndPos; 307 if( isLast && !mTotalTicks ) 308 mTotalTicks = mTotalQueuedTicks; 309 310 mTotalQueuedPackets ++; 311 312 return !dropPacket; 313} 314 315#undef DEBUG_SPEW 316#endif // _ASYNCPACKETQUEUE_H_ 317
