Skip to content

Commit

Permalink
Move buffer for RTP packets into internal
Browse files Browse the repository at this point in the history
Can be used by NACK and JitterBuffer now
  • Loading branch information
Sean-Der committed Oct 10, 2024
1 parent a6975b0 commit bff0e3d
Show file tree
Hide file tree
Showing 11 changed files with 237 additions and 237 deletions.
15 changes: 15 additions & 0 deletions internal/rtpbuffer/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package rtpbuffer

import "errors"

// ErrInvalidSize is returned by newReceiveLog/newRTPBuffer, when an incorrect buffer size is supplied.
var ErrInvalidSize = errors.New("invalid buffer size")

var (
errPacketReleased = errors.New("could not retain packet, already released")
errFailedToCastHeaderPool = errors.New("could not access header pool, failed cast")
errFailedToCastPayloadPool = errors.New("could not access payload pool, failed cast")
)
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package nack
package rtpbuffer

import (
"encoding/binary"
Expand All @@ -11,16 +11,22 @@ import (
"github.com/pion/rtp"
)

const maxPayloadLen = 1460
// PacketFactory allows custom logic around the handle of RTP Packets before they added to the RTPBuffer.
// The NoOpPacketFactory doesn't copy packets, while the RetainablePacket will take a copy before adding
type PacketFactory interface {
NewPacket(header *rtp.Header, payload []byte, rtxSsrc uint32, rtxPayloadType uint8) (*RetainablePacket, error)
}

type packetManager struct {
// PacketFactoryCopy is PacketFactory that takes a copy of packets when added to the RTPBuffer
type PacketFactoryCopy struct {
headerPool *sync.Pool
payloadPool *sync.Pool
rtxSequencer rtp.Sequencer
}

func newPacketManager() *packetManager {
return &packetManager{
// NewPacketFactoryCopy constructs a PacketFactory that takes a copy of packets when added to the RTPBuffer
func NewPacketFactoryCopy() *PacketFactoryCopy {
return &PacketFactoryCopy{
headerPool: &sync.Pool{
New: func() interface{} {
return &rtp.Header{}
Expand All @@ -36,12 +42,13 @@ func newPacketManager() *packetManager {
}
}

func (m *packetManager) NewPacket(header *rtp.Header, payload []byte, rtxSsrc uint32, rtxPayloadType uint8) (*retainablePacket, error) {
// NewPacket constructs a new RetainablePacket that can be added to the RTPBuffer
func (m *PacketFactoryCopy) NewPacket(header *rtp.Header, payload []byte, rtxSsrc uint32, rtxPayloadType uint8) (*RetainablePacket, error) {
if len(payload) > maxPayloadLen {
return nil, io.ErrShortBuffer
}

p := &retainablePacket{
p := &RetainablePacket{
onRelease: m.releasePacket,
sequenceNumber: header.SequenceNumber,
// new packets have retain count of 1
Expand Down Expand Up @@ -92,17 +99,19 @@ func (m *packetManager) NewPacket(header *rtp.Header, payload []byte, rtxSsrc ui
return p, nil
}

func (m *packetManager) releasePacket(header *rtp.Header, payload *[]byte) {
func (m *PacketFactoryCopy) releasePacket(header *rtp.Header, payload *[]byte) {
m.headerPool.Put(header)
if payload != nil {
m.payloadPool.Put(payload)
}
}

type noOpPacketFactory struct{}
// PacketFactoryNoOp is a PacketFactory implementation that doesn't copy packets
type PacketFactoryNoOp struct{}

func (f *noOpPacketFactory) NewPacket(header *rtp.Header, payload []byte, _ uint32, _ uint8) (*retainablePacket, error) {
return &retainablePacket{
// NewPacket constructs a new RetainablePacket that can be added to the RTPBuffer
func (f *PacketFactoryNoOp) NewPacket(header *rtp.Header, payload []byte, _ uint32, _ uint8) (*RetainablePacket, error) {
return &RetainablePacket{
onRelease: f.releasePacket,
count: 1,
header: header,
Expand All @@ -111,52 +120,6 @@ func (f *noOpPacketFactory) NewPacket(header *rtp.Header, payload []byte, _ uint
}, nil
}

func (f *noOpPacketFactory) releasePacket(_ *rtp.Header, _ *[]byte) {
func (f *PacketFactoryNoOp) releasePacket(_ *rtp.Header, _ *[]byte) {
// no-op
}

type retainablePacket struct {
onRelease func(*rtp.Header, *[]byte)

countMu sync.Mutex
count int

header *rtp.Header
buffer *[]byte
payload []byte

sequenceNumber uint16
}

func (p *retainablePacket) Header() *rtp.Header {
return p.header
}

func (p *retainablePacket) Payload() []byte {
return p.payload
}

func (p *retainablePacket) Retain() error {
p.countMu.Lock()
defer p.countMu.Unlock()
if p.count == 0 {
// already released
return errPacketReleased
}
p.count++
return nil
}

func (p *retainablePacket) Release() {
p.countMu.Lock()
defer p.countMu.Unlock()
p.count--

if p.count == 0 {
// release back to pool
p.onRelease(p.header, p.buffer)
p.header = nil
p.buffer = nil
p.payload = nil
}
}
61 changes: 61 additions & 0 deletions internal/rtpbuffer/retainable_packet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package rtpbuffer

import (
"sync"

"github.com/pion/rtp"
)

// RetainablePacket is a referenced counted RTP packet
type RetainablePacket struct {
onRelease func(*rtp.Header, *[]byte)

countMu sync.Mutex
count int

header *rtp.Header
buffer *[]byte
payload []byte

sequenceNumber uint16
}

// Header returns the RTP Header of the RetainablePacket
func (p *RetainablePacket) Header() *rtp.Header {
return p.header
}

// Payload returns the RTP Payload of the RetainablePacket
func (p *RetainablePacket) Payload() []byte {
return p.payload
}

// Retain increases the reference count of the RetainablePacket
func (p *RetainablePacket) Retain() error {
p.countMu.Lock()
defer p.countMu.Unlock()
if p.count == 0 {
// already released
return errPacketReleased
}
p.count++
return nil
}

// Release decreases the reference count of the RetainablePacket and frees if needed
func (p *RetainablePacket) Release() {
p.countMu.Lock()
defer p.countMu.Unlock()
p.count--

if p.count == 0 {
// release back to pool
p.onRelease(p.header, p.buffer)
p.header = nil
p.buffer = nil
p.payload = nil
}
}
103 changes: 103 additions & 0 deletions internal/rtpbuffer/rtpbuffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

// Package rtpbuffer provides a buffer for storing RTP packets
package rtpbuffer

import (
"fmt"
)

const (
// Uint16SizeHalf is half of a math.Uint16
Uint16SizeHalf = 1 << 15

maxPayloadLen = 1460
)

// RTPBuffer stores RTP packets and allows custom logic around the lifetime of them via the PacketFactory
type RTPBuffer struct {
packets []*RetainablePacket
size uint16
lastAdded uint16
started bool
}

// NewRTPBuffer constructs a new RTPBuffer
func NewRTPBuffer(size uint16) (*RTPBuffer, error) {
allowedSizes := make([]uint16, 0)
correctSize := false
for i := 0; i < 16; i++ {
if size == 1<<i {
correctSize = true
break
}
allowedSizes = append(allowedSizes, 1<<i)
}

if !correctSize {
return nil, fmt.Errorf("%w: %d is not a valid size, allowed sizes: %v", ErrInvalidSize, size, allowedSizes)
}

return &RTPBuffer{
packets: make([]*RetainablePacket, size),
size: size,
}, nil
}

// Add places the RetainablePacket in the RTPBuffer
func (r *RTPBuffer) Add(packet *RetainablePacket) {
seq := packet.sequenceNumber
if !r.started {
r.packets[seq%r.size] = packet
r.lastAdded = seq
r.started = true
return
}

diff := seq - r.lastAdded
if diff == 0 {
return
} else if diff < Uint16SizeHalf {
for i := r.lastAdded + 1; i != seq; i++ {
idx := i % r.size
prevPacket := r.packets[idx]
if prevPacket != nil {
prevPacket.Release()
}
r.packets[idx] = nil
}
}

idx := seq % r.size
prevPacket := r.packets[idx]
if prevPacket != nil {
prevPacket.Release()
}
r.packets[idx] = packet
r.lastAdded = seq
}

// Get returns the RetainablePacket for the requested sequence number
func (r *RTPBuffer) Get(seq uint16) *RetainablePacket {
diff := r.lastAdded - seq
if diff >= Uint16SizeHalf {
return nil
}

if diff >= r.size {
return nil
}

pkt := r.packets[seq%r.size]
if pkt != nil {
if pkt.sequenceNumber != seq {
return nil
}
// already released
if err := pkt.Retain(); err != nil {
return nil
}
}
return pkt
}
Loading

0 comments on commit bff0e3d

Please sign in to comment.