Skip to content

Commit

Permalink
JitterBuffer: Add a JitterBuffer-based Interceptor
Browse files Browse the repository at this point in the history
The JitterBufferInterceptor is designed to fit in a RemoteStream
pipeline and buffer incoming packets for a short period (currently
defaulting to 50 packets) before emitting packets to be consumed by the
next step in the pipeline.

The caller must ensure they are prepared to handle an
ErrPopWhileBuffering in the case that insufficient packets have been
received by the jitter buffer. The caller should retry the operation
at some point later as the buffer may have been filled in the interim.

The caller should also be aware that an ErrBufferUnderrun may be
returned in the case that the initial buffering was sufficient and
playback began but the caller is consuming packets (or they are not
arriving) quickly enough.
  • Loading branch information
thatsnotright committed Apr 19, 2024
1 parent 1449b4f commit 6d74945
Show file tree
Hide file tree
Showing 9 changed files with 288 additions and 5 deletions.
4 changes: 2 additions & 2 deletions attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (a Attributes) Set(key interface{}, val interface{}) {
}

// GetRTPHeader gets the RTP header if present. If it is not present, it will be
// unmarshalled from the raw byte slice and stored in the attribtues.
// unmarshalled from the raw byte slice and stored in the attributes.
func (a Attributes) GetRTPHeader(raw []byte) (*rtp.Header, error) {
if val, ok := a[rtpHeaderKey]; ok {
if header, ok := val.(*rtp.Header); ok {
Expand All @@ -50,7 +50,7 @@ func (a Attributes) GetRTPHeader(raw []byte) (*rtp.Header, error) {
}

// GetRTCPPackets gets the RTCP packets if present. If the packet slice is not
// present, it will be unmarshaled from the raw byte slice and stored in the
// present, it will be unmarshalled from the raw byte slice and stored in the
// attributes.
func (a Attributes) GetRTCPPackets(raw []byte) ([]rtcp.Packet, error) {
if val, ok := a[rtcpPacketsKey]; ok {
Expand Down
7 changes: 5 additions & 2 deletions internal/test/mock_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ func NewMockStream(info *interceptor.StreamInfo, i interceptor.Interceptor) *Moc
for {
i, _, err := s.rtpReader.Read(buf, interceptor.Attributes{})
if err != nil {
if err.Error() == "attempt to pop while buffering" {
continue
}
if errors.Is(err, io.EOF) {
s.rtpInModified <- RTPWithError{Err: err}
}
Expand Down Expand Up @@ -160,12 +163,12 @@ func (s *MockStream) WriteRTP(p *rtp.Packet) error {
return err
}

// ReceiveRTCP schedules a new rtcp batch, so it can be read be the stream
// ReceiveRTCP schedules a new rtcp batch, so it can be read by the stream
func (s *MockStream) ReceiveRTCP(pkts []rtcp.Packet) {
s.rtcpIn <- pkts
}

// ReceiveRTP schedules a rtp packet, so it can be read be the stream
// ReceiveRTP schedules a rtp packet, so it can be read by the stream
func (s *MockStream) ReceiveRTP(packet *rtp.Packet) {
s.rtpIn <- packet
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/jitterbuffer/jitter_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,3 +267,16 @@ func (jb *JitterBuffer) PopAtTimestamp(ts uint32) (*rtp.Packet, error) {
jb.updateState()
return packet, nil
}

// Clear will empty the buffer and optionally reset the state
func (jb *JitterBuffer) Clear(resetState bool) {
jb.mutex.Lock()
defer jb.mutex.Unlock()
jb.packets.Clear()
if resetState {
jb.lastSequence = 0
jb.state = Buffering
jb.stats = Stats{0, 0, 0}
jb.minStartCount = 50
}
}
17 changes: 16 additions & 1 deletion pkg/jitterbuffer/jitter_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ func TestJitterBuffer(t *testing.T) {

jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5012, Timestamp: 512}, Payload: []byte{0x02}})

assert.Equal(jb.lastSequence, uint16(5012))
assert.Equal(jb.stats.outOfOrderCount, uint32(1))
assert.Equal(jb.packets.Length(), uint16(4))
assert.Equal(jb.lastSequence, uint16(5012))
Expand Down Expand Up @@ -214,4 +213,20 @@ func TestJitterBuffer(t *testing.T) {
assert.NotNil(pkt)
}
})

t.Run("Allows clearing the buffer", func(*testing.T) {
jb := New()
jb.Clear(false)

assert.Equal(jb.lastSequence, uint16(0))
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}})
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5001, Timestamp: 501}, Payload: []byte{0x02}})
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5002, Timestamp: 502}, Payload: []byte{0x02}})

assert.Equal(jb.lastSequence, uint16(5002))
jb.Clear(true)
assert.Equal(jb.lastSequence, uint16(0))
assert.Equal(jb.stats.outOfOrderCount, uint32(0))
assert.Equal(jb.packets.Length(), uint16(0))
})
}
19 changes: 19 additions & 0 deletions pkg/jitterbuffer/option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package jitterbuffer

import (
"github.com/pion/logging"
)

// ReceiverInterceptorOption can be used to configure ReceiverInterceptor
type ReceiverInterceptorOption func(d *ReceiverInterceptor) error

// Log sets a logger for the interceptor
func Log(log logging.LeveledLogger) ReceiverInterceptorOption {
return func(d *ReceiverInterceptor) error {
d.log = log
return nil
}
}
10 changes: 10 additions & 0 deletions pkg/jitterbuffer/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,13 @@ func (q *PriorityQueue) PopAtTimestamp(timestamp uint32) (*rtp.Packet, error) {
}
return nil, ErrNotFound
}

// Clear will empty a PriorityQueue
func (q *PriorityQueue) Clear() {
next := q.next
q.length = 0
for next != nil {
next.prev = nil
next = next.next
}
}
15 changes: 15 additions & 0 deletions pkg/jitterbuffer/priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,18 @@ func TestPriorityQueue_Find(t *testing.T) {
_, err = packets.Find(1001)
assert.Error(t, err)
}

func TestPriorityQueue_Clean(t *testing.T) {
packets := NewQueue()
packets.Clear()
packets.Push(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 1000,
Timestamp: 5,
SSRC: 5,
},
Payload: []uint8{0xA},
}, 1000)
assert.EqualValues(t, 1, packets.Length())
packets.Clear()
}
110 changes: 110 additions & 0 deletions pkg/jitterbuffer/receiver_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package jitterbuffer

import (
"sync"

"github.com/pion/interceptor"
"github.com/pion/logging"
"github.com/pion/rtp"
)

// InterceptorFactory is a interceptor.Factory for a GeneratorInterceptor
type InterceptorFactory struct {
opts []ReceiverInterceptorOption
}

// NewInterceptor constructs a new ReceiverInterceptor
func (g *InterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor, error) {
i := &ReceiverInterceptor{
close: make(chan struct{}),
log: logging.NewDefaultLoggerFactory().NewLogger("jitterbuffer"),
buffer: New(),
}

for _, opt := range g.opts {
if err := opt(i); err != nil {
return nil, err

Check warning on line 29 in pkg/jitterbuffer/receiver_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/jitterbuffer/receiver_interceptor.go#L29

Added line #L29 was not covered by tests
}
}

return i, nil
}

// ReceiverInterceptor places a JitterBuffer in the chain to smooth packet arrival
// and allow for network jitter
//
// The Interceptor is designed to fit in a RemoteStream
// pipeline and buffer incoming packets for a short period (currently
// defaulting to 50 packets) before emitting packets to be consumed by the
// next step in the pipeline.
//
// The caller must ensure they are prepared to handle an
// ErrPopWhileBuffering in the case that insufficient packets have been
// received by the jitter buffer. The caller should retry the operation
// at some point later as the buffer may have been filled in the interim.
//
// The caller should also be aware that an ErrBufferUnderrun may be
// returned in the case that the initial buffering was sufficient and
// playback began but the caller is consuming packets (or they are not
// arriving) quickly enough.
type ReceiverInterceptor struct {
interceptor.NoOp
buffer *JitterBuffer
m sync.Mutex
wg sync.WaitGroup
close chan struct{}
log logging.LeveledLogger
}

// NewInterceptor returns a new InterceptorFactory
func NewInterceptor(opts ...ReceiverInterceptorOption) (*InterceptorFactory, error) {
return &InterceptorFactory{opts}, nil
}

// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method
// will be called once per rtp packet.
func (i *ReceiverInterceptor) BindRemoteStream(_ *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
buf := make([]byte, len(b))
n, attr, err := reader.Read(buf, a)
if err != nil {
return n, attr, err
}
packet := &rtp.Packet{}
if err := packet.Unmarshal(buf); err != nil {
return 0, nil, err

Check warning on line 78 in pkg/jitterbuffer/receiver_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/jitterbuffer/receiver_interceptor.go#L78

Added line #L78 was not covered by tests
}
i.m.Lock()
defer i.m.Unlock()
i.buffer.Push(packet)
if i.buffer.state == Emitting {
newPkt, err := i.buffer.Pop()
if err != nil {
return 0, nil, err

Check warning on line 86 in pkg/jitterbuffer/receiver_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/jitterbuffer/receiver_interceptor.go#L86

Added line #L86 was not covered by tests
}
nlen, err := newPkt.MarshalTo(b)
return nlen, attr, err
}
return n, attr, ErrPopWhileBuffering
})
}

// UnbindRemoteStream is called when the Stream is removed. It can be used to clean up any data related to that track.
func (i *ReceiverInterceptor) UnbindRemoteStream(_ *interceptor.StreamInfo) {
defer i.wg.Wait()
i.m.Lock()
defer i.m.Unlock()
i.buffer.Clear(true)

Check warning on line 100 in pkg/jitterbuffer/receiver_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/jitterbuffer/receiver_interceptor.go#L96-L100

Added lines #L96 - L100 were not covered by tests
}

// Close closes the interceptor
func (i *ReceiverInterceptor) Close() error {
defer i.wg.Wait()
i.m.Lock()
defer i.m.Unlock()
i.buffer.Clear(true)
return nil
}
98 changes: 98 additions & 0 deletions pkg/jitterbuffer/receiver_interceptor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package jitterbuffer

import (
"bytes"
"testing"
"time"

"github.com/pion/interceptor"
"github.com/pion/interceptor/internal/test"
"github.com/pion/logging"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/stretchr/testify/assert"
)

func TestBufferStart(t *testing.T) {
buf := bytes.Buffer{}

factory, err := NewInterceptor(
Log(logging.NewDefaultLoggerFactory().NewLogger("test")),
)
assert.NoError(t, err)

i, err := factory.NewInterceptor("")
assert.NoError(t, err)

assert.Zero(t, buf.Len())

stream := test.NewMockStream(&interceptor.StreamInfo{
SSRC: 123456,
ClockRate: 90000,
}, i)
defer func() {
assert.NoError(t, stream.Close())
}()

stream.ReceiveRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{
SenderSSRC: 123,
MediaSSRC: 456,
}})
stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{
SequenceNumber: uint16(0),
}})

// Give time for packets to be handled and stream written to.
time.Sleep(50 * time.Millisecond)
select {
case pkt := <-stream.ReadRTP():
assert.EqualValues(t, nil, pkt)
default:
// No data ready to read, this is what we expect
}
err = i.Close()
assert.NoError(t, err)
assert.Zero(t, buf.Len())
}

func TestReceiverBuffersAndPlaysout(t *testing.T) {
buf := bytes.Buffer{}

factory, err := NewInterceptor(
Log(logging.NewDefaultLoggerFactory().NewLogger("test")),
)
assert.NoError(t, err)

i, err := factory.NewInterceptor("")
assert.NoError(t, err)

assert.EqualValues(t, 0, buf.Len())

stream := test.NewMockStream(&interceptor.StreamInfo{
SSRC: 123456,
ClockRate: 90000,
}, i)

stream.ReceiveRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{
SenderSSRC: 123,
MediaSSRC: 456,
}})
for s := 0; s < 61; s++ {
stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{
SequenceNumber: uint16(s),
}})
}
// Give time for packets to be handled and stream written to.
time.Sleep(50 * time.Millisecond)
for s := 0; s < 10; s++ {
read := <-stream.ReadRTP()
seq := read.Packet.Header.SequenceNumber
assert.EqualValues(t, uint16(s), seq)
}
assert.NoError(t, stream.Close())
err = i.Close()
assert.NoError(t, err)
}

0 comments on commit 6d74945

Please sign in to comment.