Skip to content

Commit 8c15bfa

Browse files
varsillrootjerzywilczek
authored
Add device server (#4)
* Add create_device * Fix the resource * Add device server * Add Device server and destroy function * Fix credo * Format the code * Implement reviewers suggestions --------- Co-authored-by: root <root@gpu2+sandbox> Co-authored-by: Jerzy Wilczek <jerzy.wilczek@swmansion.com>
1 parent 02fffd1 commit 8c15bfa

11 files changed

Lines changed: 158 additions & 48 deletions

File tree

lib/decoder.ex

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ defmodule Membrane.VKVideo.Decoder do
55
"""
66
use Membrane.Filter
77

8-
alias Membrane.VKVideo.Native
8+
alias Membrane.VKVideo.{DeviceServer, Native}
99

1010
def_input_pad :input, accepted_format: %Membrane.H264{stream_structure: :annexb, alignment: :au}
1111
def_output_pad :output, accepted_format: %Membrane.RawVideo{pixel_format: :NV12}
@@ -18,7 +18,8 @@ defmodule Membrane.VKVideo.Decoder do
1818

1919
@impl true
2020
def handle_setup(_ctx, state) do
21-
{:ok, decoder} = Native.new_decoder()
21+
{:ok, device} = DeviceServer.get_device()
22+
{:ok, decoder} = Native.new_decoder(device)
2223
state = %{state | decoder: decoder}
2324
{[], state}
2425
end
@@ -71,7 +72,8 @@ defmodule Membrane.VKVideo.Decoder do
7172
@impl true
7273
def handle_end_of_stream(:input, _ctx, state) do
7374
{:ok, flushed_frames} = Native.flush_decoder(state.decoder)
75+
:ok = Native.destroy(state.decoder)
7476
{actions, state} = Enum.flat_map_reduce(flushed_frames, state, &prepare_actions(&1, &2))
75-
{actions ++ [end_of_stream: :output], state}
77+
{actions ++ [end_of_stream: :output], %{state | decoder: nil}}
7678
end
7779
end

lib/device_server.ex

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
defmodule Membrane.VKVideo.DeviceServer do
2+
@moduledoc false
3+
use GenServer
4+
alias Membrane.VKVideo.Native
5+
6+
@impl true
7+
def init(_opts) do
8+
{:ok, %{device: nil}}
9+
end
10+
11+
@impl true
12+
def handle_call(:get_device, _from, state) do
13+
state = maybe_create_device(state)
14+
{:reply, state.device, state}
15+
end
16+
17+
defp maybe_create_device(%{device: nil} = state) do
18+
device = Native.create_device()
19+
%{state | device: device}
20+
end
21+
22+
defp maybe_create_device(state), do: state
23+
24+
@spec get_device() :: {:ok, Native.t()} | no_return()
25+
def get_device() do
26+
GenServer.call(__MODULE__, :get_device)
27+
end
28+
end

lib/encoder.ex

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ defmodule Membrane.VKVideo.Encoder do
77

88
require Membrane.Logger
99

10-
alias Membrane.VKVideo.Native
10+
alias Membrane.VKVideo.{DeviceServer, Native}
1111

1212
def_input_pad :input, accepted_format: %Membrane.RawVideo{pixel_format: :NV12}
1313

@@ -84,8 +84,11 @@ defmodule Membrane.VKVideo.Encoder do
8484
end
8585

8686
defp spawn_encoder(state) do
87+
{:ok, device} = DeviceServer.get_device()
88+
8789
{:ok, encoder} =
8890
Native.new_encoder(
91+
device,
8992
state.width,
9093
state.height,
9194
state.framerate,
@@ -130,6 +133,7 @@ defmodule Membrane.VKVideo.Encoder do
130133

131134
@impl true
132135
def handle_end_of_stream(:input, _ctx, state) do
136+
:ok = Native.destroy(state.encoder)
133137
state = %{state | encoder: nil}
134138
{[end_of_stream: :output], state}
135139
end

lib/native.ex

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,11 @@ defmodule Membrane.VKVideo.Native do
1515
pts: non_neg_integer() | nil
1616
}
1717

18-
@spec new_decoder() :: {:ok, t()} | no_return()
19-
def new_decoder(), do: :erlang.nif_error(:nif_not_loaded)
18+
@spec create_device() :: {:ok, t()} | no_return()
19+
def create_device(), do: :erlang.nif_error(:nif_not_loaded)
20+
21+
@spec new_decoder(t()) :: {:ok, t()} | no_return()
22+
def new_decoder(_device), do: :erlang.nif_error(:nif_not_loaded)
2023

2124
@spec decode(t(), binary(), pts_ns :: non_neg_integer() | nil) ::
2225
{:ok, raw_frame()} | no_return()
@@ -27,6 +30,7 @@ defmodule Membrane.VKVideo.Native do
2730
def flush_decoder(_decoder), do: :erlang.nif_error(:nif_not_loaded)
2831

2932
@spec new_encoder(
33+
t(),
3034
non_neg_integer(),
3135
non_neg_integer(),
3236
{non_neg_integer(), non_neg_integer()},
@@ -36,10 +40,13 @@ defmodule Membrane.VKVideo.Native do
3640
| {:variable_bitrate, Membrane.VKVideo.Encoder.VariableBitrate.t()}
3741
| {:constant_bitrate, Membrane.VKVideo.Encoder.ConstantBitrate.t()}
3842
) :: {:ok, t()} | no_return()
39-
def new_encoder(_width, _height, _framerate, _tune, _rate_control),
43+
def new_encoder(_device, _width, _height, _framerate, _tune, _rate_control),
4044
do: :erlang.nif_error(:nif_not_loaded)
4145

4246
@spec encode(t(), binary(), pts_ns :: non_neg_integer() | nil) ::
4347
{:ok, encoded_frame()} | no_return()
4448
def encode(_encoder, _raw_frame, _pts \\ nil), do: :erlang.nif_error(:nif_not_loaded)
49+
50+
@spec destroy(t()) :: :ok
51+
def destroy(_resource), do: :erlang.nif_error(:nif_not_loaded)
4552
end

lib/vk_video.ex

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
defmodule Membrane.VKVideo do
2+
@moduledoc false
3+
use Application
4+
5+
alias Membrane.VKVideo.DeviceServer
6+
7+
@impl true
8+
def start(_start_type, _start_args) do
9+
GenServer.start_link(DeviceServer, nil, name: DeviceServer)
10+
end
11+
end

mix.exs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ defmodule Membrane.VKVideo.Mixfile do
2828

2929
def application do
3030
[
31-
extra_applications: []
31+
extra_applications: [],
32+
mod: {Membrane.VKVideo, []}
3233
]
3334
end
3435

native/vkvideo/src/decoder.rs

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
1+
use crate::ok;
12
use crate::Resource;
23
use rustler::{Atom, Binary, Env, Error, NifStruct, OwnedBinary, ResourceArc};
34
use std::sync::Mutex;
45
use vk_video::{parameters::DecoderParameters, BytesDecoder, EncodedInputChunk};
56

6-
rustler::atoms! {
7-
ok,
8-
}
97
pub struct DecoderResource {
108
pub decoder_mutex: Mutex<BytesDecoder>,
119
}
@@ -19,16 +17,14 @@ pub struct RawFrame<'a> {
1917
pub height: u32,
2018
}
2119

22-
pub fn new() -> Result<(Atom, ResourceArc<Resource>), Error> {
23-
let instance = vk_video::VulkanInstance::new()
24-
.map_err(|err| Error::RaiseTerm(Box::new(err.to_string())))?;
25-
let adapter = instance
26-
.create_adapter(None)
27-
.map_err(|err| Error::RaiseTerm(Box::new(err.to_string())))?;
28-
let device = adapter
29-
.create_device(wgpu::Features::empty(), wgpu::Limits::default())
30-
.map_err(|err| Error::RaiseTerm(Box::new(err.to_string())))?;
31-
let decoder = device
20+
pub fn new(
21+
_env: Env,
22+
resource: ResourceArc<Resource>,
23+
) -> Result<(Atom, ResourceArc<Resource>), Error> {
24+
let decoder = resource
25+
.device()
26+
.ok_or_else(|| Error::BadArg)?
27+
.device
3228
.create_bytes_decoder(DecoderParameters::default())
3329
.map_err(|err| Error::RaiseTerm(Box::new(err.to_string())))?;
3430
let decoder_mutex = Mutex::new(decoder);

native/vkvideo/src/encoder.rs

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,13 @@
1+
use crate::ok;
12
use crate::Resource;
23
use rustler::{Atom, Error, NifTaggedEnum, ResourceArc};
34
use rustler::{Binary, Env, NifStruct, NifUnitEnum, OwnedBinary};
45
use std::sync::Mutex;
56
use vk_video::parameters::{RateControl, Rational, VideoParameters};
67
use vk_video::{BytesEncoder, Frame, RawFrameData};
78

8-
rustler::atoms! {
9-
ok
10-
}
11-
129
pub struct EncoderResource {
13-
pub encoder_mutex: Mutex<BytesEncoder>,
10+
pub encoder_mutex: Mutex<Option<BytesEncoder>>,
1411
pub width: u32,
1512
pub height: u32,
1613
}
@@ -75,22 +72,17 @@ impl Into<RateControl> for EncoderRateControl {
7572
}
7673

7774
pub fn new(
75+
_env: Env,
76+
resource: ResourceArc<Resource>,
7877
width: u32,
7978
height: u32,
8079
frame_rate: (u32, u32),
8180
tune: EncoderTune,
8281
rate_control: EncoderRateControl,
8382
) -> Result<(Atom, ResourceArc<Resource>), Error> {
83+
let device_resource = &resource.device().ok_or_else(|| Error::BadArg)?.device;
8484
let non_zero_width = std::num::NonZero::new(width).ok_or(Error::BadArg)?;
8585
let non_zero_height = std::num::NonZero::new(height).ok_or(Error::BadArg)?;
86-
let instance = vk_video::VulkanInstance::new()
87-
.map_err(|err| Error::RaiseTerm(Box::new(err.to_string())))?;
88-
let adapter = instance
89-
.create_adapter(None)
90-
.map_err(|err| Error::RaiseTerm(Box::new(err.to_string())))?;
91-
let device = adapter
92-
.create_device(wgpu::Features::empty(), wgpu::Limits::default())
93-
.map_err(|err| Error::RaiseTerm(Box::new(err.to_string())))?;
9486

9587
let video_parameters = VideoParameters {
9688
width: non_zero_width,
@@ -102,18 +94,18 @@ pub fn new(
10294
};
10395

10496
let parameters = match tune {
105-
EncoderTune::LowLatency => device
97+
EncoderTune::LowLatency => device_resource
10698
.encoder_parameters_low_latency(video_parameters, rate_control.into())
10799
.map_err(|err| Error::RaiseTerm(Box::new(err.to_string())))?,
108-
EncoderTune::HighQuality => device
100+
EncoderTune::HighQuality => device_resource
109101
.encoder_parameters_high_quality(video_parameters, rate_control.into())
110102
.map_err(|err| Error::RaiseTerm(Box::new(err.to_string())))?,
111103
};
112104

113-
let encoder = device
105+
let encoder = device_resource
114106
.create_bytes_encoder(parameters)
115107
.map_err(|err| Error::RaiseTerm(Box::new(err.to_string())))?;
116-
let encoder_mutex = Mutex::new(encoder);
108+
let encoder_mutex = Mutex::new(Some(encoder));
117109
let encoder_resource = EncoderResource {
118110
encoder_mutex,
119111
width,
@@ -140,11 +132,13 @@ pub fn encode<'a>(
140132
pts: pts_ns,
141133
};
142134

143-
let mut encoder = encoder_resource
135+
let mut guard = encoder_resource
144136
.encoder_mutex
145137
.lock()
146138
.map_err(|err| Error::RaiseTerm(Box::new(err.to_string())))?;
147139

140+
let encoder = guard.as_mut().ok_or(Error::BadArg)?;
141+
148142
let encoded_frame = encoder
149143
.encode(&frame, false)
150144
.map_err(|err| Error::RaiseTerm(Box::new(err.to_string())))?;

native/vkvideo/src/lib.rs

Lines changed: 59 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,77 @@
11
use decoder::{DecoderResource, RawFrame};
22
use encoder::{EncodedFrame, EncoderRateControl, EncoderResource, EncoderTune};
33
use rustler::{Atom, Binary, Env, Error, ResourceArc, Term};
4+
use std::sync::Arc;
5+
use vk_video::VulkanDevice;
6+
7+
rustler::atoms! {
8+
ok,
9+
}
410

511
pub mod decoder;
612
pub mod encoder;
713

814
pub enum Resource {
915
Encoder(EncoderResource),
1016
Decoder(DecoderResource),
17+
Device(DeviceResource),
18+
}
19+
20+
pub struct DeviceResource {
21+
pub device: Arc<VulkanDevice>,
1122
}
1223

24+
impl std::panic::RefUnwindSafe for DeviceResource {}
25+
1326
impl Resource {
1427
pub fn encoder(&self) -> Option<&EncoderResource> {
1528
match self {
1629
Self::Encoder(encoder_resource) => Some(encoder_resource),
17-
Self::Decoder(_) => None,
30+
_ => None,
1831
}
1932
}
2033

2134
pub fn decoder(&self) -> Option<&DecoderResource> {
2235
match self {
2336
Self::Decoder(decoder_resource) => Some(decoder_resource),
24-
Self::Encoder(_) => None,
37+
_ => None,
38+
}
39+
}
40+
41+
pub fn device(&self) -> Option<&DeviceResource> {
42+
match self {
43+
Self::Device(device_resource) => Some(device_resource),
44+
_ => None,
2545
}
2646
}
2747
}
2848

49+
#[allow(non_local_definitions)]
2950
fn load(env: Env, _: Term) -> bool {
3051
rustler::resource!(Resource, env)
3152
}
3253

3354
#[rustler::nif(schedule = "DirtyIo")]
34-
fn new_decoder() -> Result<(Atom, ResourceArc<Resource>), Error> {
35-
decoder::new()
55+
fn create_device() -> Result<(Atom, ResourceArc<Resource>), Error> {
56+
let instance = vk_video::VulkanInstance::new()
57+
.map_err(|err| Error::RaiseTerm(Box::new(err.to_string())))?;
58+
let adapter = instance
59+
.create_adapter(None)
60+
.map_err(|err| Error::RaiseTerm(Box::new(err.to_string())))?;
61+
let device = adapter
62+
.create_device(wgpu::Features::empty(), wgpu::Limits::default())
63+
.map_err(|err| Error::RaiseTerm(Box::new(err.to_string())))?;
64+
65+
let device_resource = ResourceArc::new(Resource::Device(DeviceResource { device }));
66+
Ok((ok(), device_resource))
67+
}
68+
69+
#[rustler::nif(schedule = "DirtyIo")]
70+
fn new_decoder(
71+
env: Env,
72+
resource: ResourceArc<Resource>,
73+
) -> Result<(Atom, ResourceArc<Resource>), Error> {
74+
decoder::new(env, resource)
3675
}
3776

3877
#[rustler::nif(schedule = "DirtyIo")]
@@ -55,13 +94,15 @@ pub fn flush_decoder(
5594

5695
#[rustler::nif(schedule = "DirtyIo")]
5796
fn new_encoder(
97+
env: Env,
98+
resource: ResourceArc<Resource>,
5899
width: u32,
59100
height: u32,
60101
frame_rate: (u32, u32),
61102
tune: EncoderTune,
62103
rate_control: EncoderRateControl,
63104
) -> Result<(Atom, ResourceArc<Resource>), Error> {
64-
encoder::new(width, height, frame_rate, tune, rate_control)
105+
encoder::new(env, resource, width, height, frame_rate, tune, rate_control)
65106
}
66107

67108
#[rustler::nif(schedule = "DirtyIo")]
@@ -74,4 +115,17 @@ fn encode<'a>(
74115
encoder::encode(env, resource, bytes, pts_ns)
75116
}
76117

118+
#[rustler::nif(schedule = "DirtyIo")]
119+
fn destroy<'a>(env: Env<'a>, resource: ResourceArc<Resource>) -> Result<Atom, Error> {
120+
if let Resource::Encoder(encoder) = &*resource {
121+
let mut encoder = encoder
122+
.encoder_mutex
123+
.lock()
124+
.map_err(|err| Error::RaiseTerm(Box::new(err.to_string())))?;
125+
*encoder = None;
126+
}
127+
128+
Ok(ok())
129+
}
130+
77131
rustler::init!("Elixir.Membrane.VKVideo.Native", load = load);

0 commit comments

Comments
 (0)