Skip to content

Commit

Permalink
Update JitterBuffer to use rtpbuffer
Browse files Browse the repository at this point in the history
Remove PriorityQueue
  • Loading branch information
Sean-Der committed Oct 10, 2024
1 parent bff0e3d commit 60a8329
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 426 deletions.
24 changes: 22 additions & 2 deletions internal/rtpbuffer/retainable_packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ func (p *RetainablePacket) Payload() []byte {
return p.payload
}

// Packet returns a RTP Packet for a RetainablePacket
func (p *RetainablePacket) Packet() *rtp.Packet {
return &rtp.Packet{
Header: *p.Header(),
Payload: p.Payload(),

Check warning on line 40 in internal/rtpbuffer/retainable_packet.go

View check run for this annotation

Codecov / codecov/patch

internal/rtpbuffer/retainable_packet.go#L37-L40

Added lines #L37 - L40 were not covered by tests
}
}

// Retain increases the reference count of the RetainablePacket
func (p *RetainablePacket) Retain() error {
p.countMu.Lock()
Expand All @@ -46,10 +54,15 @@ func (p *RetainablePacket) Retain() error {
}

// Release decreases the reference count of the RetainablePacket and frees if needed
func (p *RetainablePacket) Release() {
func (p *RetainablePacket) Release(force bool) {
p.countMu.Lock()
defer p.countMu.Unlock()
p.count--

if !force {
p.count--
} else {
p.count = 0

Check warning on line 64 in internal/rtpbuffer/retainable_packet.go

View check run for this annotation

Codecov / codecov/patch

internal/rtpbuffer/retainable_packet.go#L64

Added line #L64 was not covered by tests
}

if p.count == 0 {
// release back to pool
Expand All @@ -59,3 +72,10 @@ func (p *RetainablePacket) Release() {
p.payload = nil
}
}

func (p *RetainablePacket) getCount() int {
p.countMu.Lock()
defer p.countMu.Unlock()

Check warning on line 78 in internal/rtpbuffer/retainable_packet.go

View check run for this annotation

Codecov / codecov/patch

internal/rtpbuffer/retainable_packet.go#L76-L78

Added lines #L76 - L78 were not covered by tests

return p.count

Check warning on line 80 in internal/rtpbuffer/retainable_packet.go

View check run for this annotation

Codecov / codecov/patch

internal/rtpbuffer/retainable_packet.go#L80

Added line #L80 was not covered by tests
}
37 changes: 35 additions & 2 deletions internal/rtpbuffer/rtpbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (r *RTPBuffer) Add(packet *RetainablePacket) {
idx := i % r.size
prevPacket := r.packets[idx]
if prevPacket != nil {
prevPacket.Release()
prevPacket.Release(false)
}
r.packets[idx] = nil
}
Expand All @@ -72,7 +72,7 @@ func (r *RTPBuffer) Add(packet *RetainablePacket) {
idx := seq % r.size
prevPacket := r.packets[idx]
if prevPacket != nil {
prevPacket.Release()
prevPacket.Release(false)
}
r.packets[idx] = packet
r.lastAdded = seq
Expand Down Expand Up @@ -101,3 +101,36 @@ func (r *RTPBuffer) Get(seq uint16) *RetainablePacket {
}
return pkt
}

// GetTimestamp returns a RetainablePacket for the requested timestamp
func (r *RTPBuffer) GetTimestamp(timestamp uint32) *RetainablePacket {
for i := range r.packets {
pkt := r.packets[i]
if pkt != nil && pkt.Header() != nil && pkt.Header().Timestamp == timestamp {
if err := pkt.Retain(); err != nil {
return nil

Check warning on line 111 in internal/rtpbuffer/rtpbuffer.go

View check run for this annotation

Codecov / codecov/patch

internal/rtpbuffer/rtpbuffer.go#L106-L111

Added lines #L106 - L111 were not covered by tests
}

return pkt

Check warning on line 114 in internal/rtpbuffer/rtpbuffer.go

View check run for this annotation

Codecov / codecov/patch

internal/rtpbuffer/rtpbuffer.go#L114

Added line #L114 was not covered by tests
}
}
return nil

Check warning on line 117 in internal/rtpbuffer/rtpbuffer.go

View check run for this annotation

Codecov / codecov/patch

internal/rtpbuffer/rtpbuffer.go#L117

Added line #L117 was not covered by tests
}

// Length returns the count of valid RetainablePackets in the RTPBuffer
func (r *RTPBuffer) Length() (length uint16) {
for i := range r.packets {
if r.packets[i] != nil && r.packets[i].getCount() != 0 {
length++

Check warning on line 124 in internal/rtpbuffer/rtpbuffer.go

View check run for this annotation

Codecov / codecov/patch

internal/rtpbuffer/rtpbuffer.go#L121-L124

Added lines #L121 - L124 were not covered by tests
}
}

return

Check warning on line 128 in internal/rtpbuffer/rtpbuffer.go

View check run for this annotation

Codecov / codecov/patch

internal/rtpbuffer/rtpbuffer.go#L128

Added line #L128 was not covered by tests
}

// Clear erases all the packets in the RTPBuffer
func (r *RTPBuffer) Clear() {
r.lastAdded = 0
r.started = false
r.packets = make([]*RetainablePacket, r.size)

Check warning on line 135 in internal/rtpbuffer/rtpbuffer.go

View check run for this annotation

Codecov / codecov/patch

internal/rtpbuffer/rtpbuffer.go#L132-L135

Added lines #L132 - L135 were not covered by tests
}
4 changes: 2 additions & 2 deletions internal/rtpbuffer/rtpbuffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestRTPBuffer(t *testing.T) {
if packet.Header().SequenceNumber != seq {
t.Errorf("packet for %d returned with incorrect SequenceNumber: %d", seq, packet.Header().SequenceNumber)
}
packet.Release()
packet.Release(false)
}
}
assertNOTGet := func(nums ...uint16) {
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestRTPBuffer_Overridden(t *testing.T) {
retrieved := sb.Get(1)
require.NotNil(t, retrieved)
require.Equal(t, "originalContent", string(retrieved.Payload()))
retrieved.Release()
retrieved.Release(false)
require.Equal(t, 1, retrieved.count)

// ensure original packet is released
Expand Down
78 changes: 42 additions & 36 deletions pkg/jitterbuffer/jitter_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package jitterbuffer
import (
"errors"

"github.com/pion/interceptor/internal/rtpbuffer"
"github.com/pion/rtp"
)

Expand All @@ -20,8 +21,12 @@ type Event string
var (
// ErrBufferUnderrun is returned when the buffer has no items
ErrBufferUnderrun = errors.New("invalid Peek: Empty jitter buffer")

// ErrPopWhileBuffering is returned if a jitter buffer is not in a playback state
ErrPopWhileBuffering = errors.New("attempt to pop while buffering")

// ErrNotFound is returned when a packet does not exist for a SequenceNumber
ErrNotFound = errors.New("packet with sequence number was not found")
)

const (
Expand Down Expand Up @@ -63,7 +68,8 @@ type (
// order, and allows removing in either sequence number order or via a
// provided timestamp
type JitterBuffer struct {
packets *PriorityQueue
packets *rtpbuffer.RTPBuffer
packetFactory rtpbuffer.PacketFactoryNoOp
minStartCount uint16
lastSequence uint16
playoutHead uint16
Expand All @@ -88,11 +94,16 @@ type Stats struct {

// New will initialize a jitter buffer and its associated statistics
func New(opts ...Option) *JitterBuffer {
rtpBuffer, err := rtpbuffer.NewRTPBuffer(rtpbuffer.Uint16SizeHalf)
if err != nil || rtpBuffer == nil {
return nil

Check warning on line 99 in pkg/jitterbuffer/jitter_buffer.go

View check run for this annotation

Codecov / codecov/patch

pkg/jitterbuffer/jitter_buffer.go#L99

Added line #L99 was not covered by tests
}

jb := &JitterBuffer{
state: Buffering,
stats: Stats{0, 0, 0},
minStartCount: 50,
packets: NewQueue(),
packets: rtpBuffer,
listeners: make(map[Event][]EventListener),
}

Expand Down Expand Up @@ -142,18 +153,20 @@ func (jb *JitterBuffer) updateStats(lastPktSeqNo uint16) {
// the data so if the memory is expected to be reused, the caller should
// take this in to account and pass a copy of the packet they wish to buffer
func (jb *JitterBuffer) Push(packet *rtp.Packet) {
if jb.packets.Length() == 0 {
if packetsLen := jb.packets.Length(); packetsLen == 0 {
if !jb.playoutReady {
jb.playoutHead = packet.SequenceNumber
}

jb.emit(StartBuffering)
}
if jb.packets.Length() > 100 {
} else if packetsLen > 100 {
jb.stats.overflowCount++
jb.emit(BufferOverflow)
}
if !jb.playoutReady && jb.packets.Length() == 0 {
jb.playoutHead = packet.SequenceNumber
}

jb.updateStats(packet.SequenceNumber)
jb.packets.Push(packet, packet.SequenceNumber)
retainablePkt, _ := jb.packetFactory.NewPacket(&packet.Header, packet.Payload, 0, 0)
jb.packets.Add(retainablePkt)
jb.updateState()
}

Expand Down Expand Up @@ -184,67 +197,60 @@ func (jb *JitterBuffer) Peek(playoutHead bool) (*rtp.Packet, error) {
return nil, ErrBufferUnderrun
}
if playoutHead && jb.state == Emitting {
return jb.packets.Find(jb.playoutHead)
return jb.PeekAtSequence(jb.playoutHead)
}
return jb.packets.Find(jb.lastSequence)
return jb.PeekAtSequence(jb.lastSequence)
}

// Pop an RTP packet from the jitter buffer at the current playout head
func (jb *JitterBuffer) Pop() (*rtp.Packet, error) {
if jb.state != Emitting {
return nil, ErrPopWhileBuffering
}
packet, err := jb.packets.PopAt(jb.playoutHead)
if err != nil {
jb.stats.underflowCount++
jb.emit(BufferUnderflow)
return nil, err
}
jb.playoutHead = (jb.playoutHead + 1)
jb.updateState()
return packet, nil
return jb.PopAtSequence(jb.playoutHead)
}

// PopAtSequence will pop an RTP packet from the jitter buffer at the specified Sequence
func (jb *JitterBuffer) PopAtSequence(sq uint16) (*rtp.Packet, error) {
if jb.state != Emitting {
return nil, ErrPopWhileBuffering
}
packet, err := jb.packets.PopAt(sq)
if err != nil {
retainablePacket := jb.packets.Get(sq)
if retainablePacket == nil {
jb.stats.underflowCount++
jb.emit(BufferUnderflow)
return nil, err
return nil, ErrNotFound
}

defer retainablePacket.Release(true)
jb.playoutHead = (jb.playoutHead + 1)
jb.updateState()
return packet, nil
return retainablePacket.Packet(), nil
}

// PeekAtSequence will return an RTP packet from the jitter buffer at the specified Sequence
// without removing it from the buffer
func (jb *JitterBuffer) PeekAtSequence(sq uint16) (*rtp.Packet, error) {
packet, err := jb.packets.Find(sq)
if err != nil {
return nil, err
retainablePacket := jb.packets.Get(sq)
if retainablePacket == nil {
return nil, ErrNotFound

Check warning on line 233 in pkg/jitterbuffer/jitter_buffer.go

View check run for this annotation

Codecov / codecov/patch

pkg/jitterbuffer/jitter_buffer.go#L233

Added line #L233 was not covered by tests
}
return packet, nil
return retainablePacket.Packet(), nil
}

// PopAtTimestamp pops an RTP packet from the jitter buffer with the provided timestamp
// Call this method repeatedly to drain the buffer at the timestamp
func (jb *JitterBuffer) PopAtTimestamp(ts uint32) (*rtp.Packet, error) {
func (jb *JitterBuffer) PopAtTimestamp(ts uint32) (*rtp.Packet, error) { //nolint: revive
if jb.state != Emitting {
return nil, ErrPopWhileBuffering
}
packet, err := jb.packets.PopAtTimestamp(ts)
if err != nil {
retainablePacket := jb.packets.GetTimestamp(ts)
if retainablePacket == nil {
jb.stats.underflowCount++
jb.emit(BufferUnderflow)
return nil, err
return nil, ErrNotFound
}

defer retainablePacket.Release(true)
jb.updateState()
return packet, nil
return retainablePacket.Packet(), nil
}

// Clear will empty the buffer and optionally reset the state
Expand Down
2 changes: 1 addition & 1 deletion pkg/jitterbuffer/jitter_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestJitterBuffer(t *testing.T) {
assert.Equal(pkt.SequenceNumber, uint16(5002))
assert.Equal(err, nil)
for i := 0; i < 100; i++ {
sqnum := uint16((math.MaxUint16 - 32 + i))
sqnum := uint16((6000 + i))
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: sqnum, Timestamp: uint32(512 + i)}, Payload: []byte{0x02}})
}
pkt, err = jb.Peek(true)
Expand Down
Loading

0 comments on commit 60a8329

Please sign in to comment.