- Visual Studio Code
- Goland
- Cursor
- CodeBuddy
- Trae
- Qoder
- Claude Code
- Kiro
- Windsurf
$ go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
$ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest$ go install github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-grpc-gateway@latest
$ go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
$ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest- Create a Go project, e.g.,
MyPlugin - Create a
pkgdirectory for exportable code - Create a
pbdirectory for gRPC proto files - Create an
exampledirectory for testing the plugin
You can also create a directory
xxxdirectly in the monibuca project's plugin folder to store your plugin code
package plugin_myplugin
import (
"m7s.live/v5"
)
var _ = m7s.InstallPlugin[MyPlugin]()
type MyPlugin struct {
m7s.Plugin
Foo string
}MyPluginstruct is the plugin definition,Foois a plugin property that can be configured in the configuration file- Must embed
m7s.Pluginstruct to provide basic plugin functionality m7s.InstallPlugin[MyPlugin](...)registers the plugin so it can be loaded by monibuca
Example:
const defaultConfig = m7s.DefaultYaml(`tcp:
listenaddr: :5554`)
var _ = m7s.InstallPlugin[MyPlugin](m7s.PluginMeta{
DefaultYaml: defaultConfig,
})func (config *MyPlugin) Start() (err error) {
// Initialize things
return
}Used for plugin initialization after configuration is loaded. Return an error if initialization fails, and the plugin will be disabled.
func (config *MyPlugin) OnTCPConnect(conn *net.TCPConn) task.ITask {
}Called when receiving TCP connection requests if TCP listening port is configured.
func (config *MyPlugin) OnUDPConnect(conn *net.UDPConn) task.ITask {
}Called when receiving UDP connection requests if UDP listening port is configured.
func (config *MyPlugin) OnQUICConnect(quic.Connection) task.ITask {
}Called when receiving QUIC connection requests if QUIC listening port is configured.
func (config *MyPlugin) API_test1(rw http.ResponseWriter, r *http.Request) {
// do something
}Accessible via http://ip:port/myplugin/api/test1
This method supports parameterized routing:
func (config *MyPlugin) RegisterHandler() map[string]http.HandlerFunc {
return map[string]http.HandlerFunc{
"/test1/{streamPath...}": config.test1,
}
}
func (config *MyPlugin) test1(rw http.ResponseWriter, r *http.Request) {
streamPath := r.PathValue("streamPath")
// do something
}Push client needs to implement IPusher interface and pass the creation method to InstallPlugin.
type Pusher struct {
task.Task
pushJob m7s.PushJob
}
func (c *Pusher) GetPushJob() *m7s.PushJob {
return &c.pushJob
}
func NewPusher(_ config.Push) m7s.IPusher {
return &Pusher{}
}
var _ = m7s.InstallPlugin[MyPlugin](m7s.PluginMeta{
NewPusher: NewPusher,
})Pull client needs to implement IPuller interface and pass the creation method to InstallPlugin. The following Puller inherits from m7s.HTTPFilePuller for basic file and HTTP pulling. You need to override the Start method for specific pulling logic:
type Puller struct {
m7s.HTTPFilePuller
}
func NewPuller(_ config.Pull) m7s.IPuller {
return &Puller{}
}
var _ = m7s.InstallPlugin[MyPlugin](m7s.PluginMeta{
NewPuller: NewPuller,
})syntax = "proto3";
import "google/api/annotations.proto";
import "google/protobuf/empty.proto";
package myplugin;
option go_package="m7s.live/v5/plugin/myplugin/pb";
service api {
rpc MyMethod (MyRequest) returns (MyResponse) {
option (google.api.http) = {
post: "/myplugin/api/bar"
body: "foo"
};
}
}
message MyRequest {
string foo = 1;
}
message MyResponse {
string bar = 1;
}Add to VSCode task.json:
{
"type": "shell",
"label": "build pb myplugin",
"command": "protoc",
"args": [
"-I.",
"-I${workspaceRoot}/pb",
"--go_out=.",
"--go_opt=paths=source_relative",
"--go-grpc_out=.",
"--go-grpc_opt=paths=source_relative",
"--grpc-gateway_out=.",
"--grpc-gateway_opt=paths=source_relative",
"myplugin.proto"
],
"options": {
"cwd": "${workspaceRoot}/plugin/myplugin/pb"
}
}Or run command in pb directory:
protoc -I. -I$ProjectFileDir$/pb --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative --grpc-gateway_out=. --grpc-gateway_opt=paths=source_relative myplugin.protoReplace $ProjectFileDir$ with the directory containing global pb files.
Create api.go:
package plugin_myplugin
import (
"context"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/plugin/myplugin/pb"
)
func (config *MyPlugin) MyMethod(ctx context.Context, req *pb.MyRequest) (*pb.MyResponse, error) {
return &pb.MyResponse{Bar: req.Foo}, nil
}package plugin_myplugin
import (
"m7s.live/v5"
"m7s.live/v5/plugin/myplugin/pb"
)
var _ = m7s.InstallPlugin[MyPlugin](m7s.PluginMeta{
ServiceDesc: &pb.Api_ServiceDesc,
RegisterGRPCHandler: pb.RegisterApiHandler,
})
type MyPlugin struct {
pb.UnimplementedApiServer
m7s.Plugin
Foo string
}Same as v4:
func (config *MyPlugin) API_test1(rw http.ResponseWriter, r *http.Request) {
// do something
}Accessible via GET request to /myplugin/api/test1
publisher, err := p.Publish(ctx, streamPath)The ctx parameter is required, streamPath parameter is required.
The old WriteAudio and WriteVideo methods have been replaced with a more structured writer pattern using generics:
// Audio writer
audioWriter := m7s.NewPublishAudioWriter[*AudioFrame](publisher, allocator)
// Video writer
videoWriter := m7s.NewPublishVideoWriter[*VideoFrame](publisher, allocator)
// Combined audio/video writer
writer := m7s.NewPublisherWriter[*AudioFrame, *VideoFrame](publisher, allocator)// Set timestamp and write audio frame
writer.AudioFrame.SetTS32(timestamp)
err := writer.NextAudio()
// Set timestamp and write video frame
writer.VideoFrame.SetTS32(timestamp)
err := writer.NextVideo()// For custom data frames
err := publisher.WriteData(data IDataFrame)If existing audio/video data formats don't meet your needs, you can define custom formats by implementing this interface:
IAVFrame interface {
GetSample() *Sample
GetSize() int
CheckCodecChange() error
Demux() error // demux to raw format
Mux(*Sample) error // mux from origin format
Recycle()
String() string
}Define separate types for audio and video
The methods serve the following purposes:
- GetSample: Gets the Sample object containing codec context and raw data
- GetSize: Gets the size of audio/video data
- CheckCodecChange: Checks if the codec has changed
- Demux: Demuxes audio/video data to raw format for use by other formats
- Mux: Muxes from original format to custom audio/video data format
- Recycle: Recycles resources, automatically implemented when embedding RecyclableMemory
- String: Prints audio/video data information
The new pattern includes built-in memory management:
gomem.ScalableMemoryAllocator- For efficient memory allocation- Frame recycling through
Recycle()method - Automatic memory pool management
var suber *m7s.Subscriber
suber, err = p.Subscribe(ctx,streamPath)
go m7s.PlayBlock(suber, handleAudio, handleVideo)Note that handleAudio and handleVideo are callback functions you need to implement. They take an audio/video format type as input and return an error. If the error is not nil, the subscription is terminated.
The H26xFrame struct is used for handling H.264/H.265 raw stream data:
type H26xFrame struct {
pkg.Sample
}Key characteristics:
- Inherits from
pkg.Sample- contains codec context, memory management, and timing - Uses
Raw.(*pkg.Nalus)to store NALU (Network Abstraction Layer Unit) data - Supports both H.264 (AVC) and H.265 (HEVC) formats
- Uses efficient memory allocators for zero-copy operations
import (
"m7s.live/v5"
"m7s.live/v5/pkg/format"
"m7s.live/v5/pkg/util"
"time"
)
// Create publisher with H26xFrame support
func publishRawH264Stream(streamPath string, h264Frames [][]byte) error {
// Get publisher
publisher, err := p.Publish(streamPath)
if err != nil {
return err
}
// Create memory allocator
allocator := gomem.NewScalableMemoryAllocator(1 << gomem.MinPowerOf2)
defer allocator.Recycle()
// Create writer for H26xFrame
writer := m7s.NewPublisherWriter[*format.RawAudio, *format.H26xFrame](publisher, allocator)
// Set up H264 codec context
writer.VideoFrame.ICodecCtx = &format.H264{}
// Publish multiple frames
// Note: This is a demonstration of multi-frame writing. In actual scenarios,
// frames should be written gradually as they are received from the video source.
startTime := time.Now()
for i, frameData := range h264Frames {
// Create H26xFrame for each frame
frame := writer.VideoFrame
// Set timestamp with proper interval
frame.Timestamp = startTime.Add(time.Duration(i) * time.Second / 30) // 30 FPS
// Write NALU data
nalus := frame.GetNalus()
// if frameData is a single NALU, otherwise need to loop
p := nalus.GetNextPointer()
mem := frame.NextN(len(frameData))
copy(mem, frameData)
p.PushOne(mem)
// Publish frame
if err := writer.NextVideo(); err != nil {
return err
}
}
return nil
}
// Example usage with continuous streaming
func continuousH264Publishing(streamPath string, frameSource <-chan []byte, stopChan <-chan struct{}) error {
// Get publisher
publisher, err := p.Publish(streamPath)
if err != nil {
return err
}
defer publisher.Dispose()
// Create memory allocator
allocator := gomem.NewScalableMemoryAllocator(1 << gomem.MinPowerOf2)
defer allocator.Recycle()
// Create writer for H26xFrame
writer := m7s.NewPublisherWriter[*format.RawAudio, *format.H26xFrame](publisher, allocator)
// Set up H264 codec context
writer.VideoFrame.ICodecCtx = &format.H264{}
startTime := time.Now()
frameCount := 0
for {
select {
case frameData := <-frameSource:
// Create H26xFrame for each frame
frame := writer.VideoFrame
// Set timestamp with proper interval
frame.Timestamp = startTime.Add(time.Duration(frameCount) * time.Second / 30) // 30 FPS
// Write NALU data
nalus := frame.GetNalus()
mem := frame.NextN(len(frameData))
copy(mem, frameData)
// Publish frame
if err := writer.NextVideo(); err != nil {
return err
}
frameCount++
case <-stopChan:
// Stop publishing
return nil
}
}
}type MyTransform struct {
m7s.DefaultTransformer
Writer *m7s.PublishWriter[*format.RawAudio, *format.H26xFrame]
}
func (t *MyTransform) Go() {
defer t.Dispose()
for video := range t.Video {
if err := t.processH26xFrame(video); err != nil {
t.Error("process frame failed", "error", err)
break
}
}
}
func (t *MyTransform) processH26xFrame(video *format.H26xFrame) error {
// Copy frame metadata
copyVideo := t.Writer.VideoFrame
copyVideo.ICodecCtx = video.ICodecCtx
*copyVideo.BaseSample = *video.BaseSample
nalus := copyVideo.GetNalus()
// Process each NALU unit
for nalu := range video.Raw.(*pkg.Nalus).RangePoint {
p := nalus.GetNextPointer()
mem := copyVideo.NextN(nalu.Size)
nalu.CopyTo(mem)
// Example: Filter or modify specific NALU types
if video.FourCC() == codec.FourCC_H264 {
switch codec.ParseH264NALUType(mem[0]) {
case codec.NALU_IDR_Picture, codec.NALU_Non_IDR_Picture:
// Process video frame NALUs
// Example: Apply transformations, filters, etc.
case codec.NALU_SPS, codec.NALU_PPS:
// Process parameter set NALUs
}
} else if video.FourCC() == codec.FourCC_H265 {
switch codec.ParseH265NALUType(mem[0]) {
case h265parser.NAL_UNIT_CODED_SLICE_IDR_W_RADL:
// Process H.265 IDR frames
}
}
// Push processed NALU
p.PushOne(mem)
}
return t.Writer.NextVideo()
}const (
NALU_Non_IDR_Picture = 1 // Non-IDR picture (P-frames)
NALU_IDR_Picture = 5 // IDR picture (I-frames)
NALU_SEI = 6 // Supplemental enhancement information
NALU_SPS = 7 // Sequence parameter set
NALU_PPS = 8 // Picture parameter set
)
// Parse NALU type from first byte
naluType := codec.ParseH264NALUType(mem[0])// Parse H.265 NALU type from first byte
naluType := codec.ParseH265NALUType(mem[0])// Use memory allocators for efficient operations
allocator := gomem.NewScalableMemoryAllocator(1 << 20) // 1MB initial size
defer allocator.Recycle()
// When processing multiple frames, reuse the same allocator
writer := m7s.NewPublisherWriter[*format.RawAudio, *format.H26xFrame](publisher, allocator)func processFrame(video *format.H26xFrame) error {
// Check codec changes
if err := video.CheckCodecChange(); err != nil {
return err
}
// Validate frame data
if video.Raw == nil {
return fmt.Errorf("empty frame data")
}
// Process NALUs safely
nalus, ok := video.Raw.(*pkg.Nalus)
if !ok {
return fmt.Errorf("invalid NALUs format")
}
// Process frame...
return nil
}Just implement the Collector interface, and the system will automatically collect metrics from all plugins:
func (p *MyPlugin) Describe(ch chan<- *prometheus.Desc) {
}
func (p *MyPlugin) Collect(ch chan<- prometheus.Metric) {
}