Skip to content

Commit

Permalink
[RTC-435] Add tracks and their metadata (#141)
Browse files Browse the repository at this point in the history
* Add tracks to state

* Add track metadata

* Log errors

* Fix openapi lint

* Add track meta

* Add peer metadata

* Test for  message in FileComponent test

* Improve handling Engine messages

* Improve logging

* Remove unused deps

* RTC-engine dependency to master

* Improve handling unexpected messages

* Remove track encoding

* Remove unused cases

* Fix logging

* Remove unused deps

* Format

* Fix sending track_removed notifications

* PR remarks

* Synchronize protos

* Add lint ignores
  • Loading branch information
roznawsk authored Feb 7, 2024
1 parent 8f6e1ca commit 36e1d00
Show file tree
Hide file tree
Showing 28 changed files with 714 additions and 142 deletions.
5 changes: 5 additions & 0 deletions .redocly.lint-ignore.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,8 @@
openapi.yaml:
no-empty-servers:
- '#/servers'
operation-4xx-response:
- '#/paths/~1health/get/responses'
spec:
- '#/components/schemas/Track/properties/metadata/nullable'
- '#/components/schemas/PeerMetadata/nullable'
2 changes: 2 additions & 0 deletions config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,5 @@ config :phoenix, :stacktrace_depth, 20

# Initialize plugs at runtime for faster development compilation
config :phoenix, :plug_init_mode, :runtime

config :logger, level: :info
6 changes: 4 additions & 2 deletions lib/jellyfish/component.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ defmodule Jellyfish.Component do
use Bunch.Access

alias Jellyfish.Component.{File, HLS, RTSP}
alias Jellyfish.Track

@enforce_keys [
:id,
:type,
:engine_endpoint,
:properties
]
defstruct @enforce_keys
defstruct @enforce_keys ++ [tracks: %{}]

@type id :: String.t()
@type component :: HLS | RTSP | File
Expand All @@ -35,7 +36,8 @@ defmodule Jellyfish.Component do
id: id(),
type: component(),
engine_endpoint: Membrane.ChildrenSpec.child_definition(),
properties: properties()
properties: properties(),
tracks: %{Track.id() => Track.t()}
}

@spec parse_type(String.t()) :: {:ok, component()} | {:error, :invalid_type}
Expand Down
61 changes: 60 additions & 1 deletion lib/jellyfish/event.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,18 @@ defmodule Jellyfish.Event do
PeerConnected,
PeerCrashed,
PeerDisconnected,
PeerMetadataUpdated,
RoomCrashed,
RoomCreated,
RoomDeleted
RoomDeleted,
Track,
TrackAdded,
TrackMetadataUpdated,
TrackRemoved
}

alias Membrane.RTC.Engine.Message

@pubsub Jellyfish.PubSub
@valid_topics [:server_notification, :metrics]

Expand Down Expand Up @@ -59,6 +66,38 @@ defmodule Jellyfish.Event do
defp to_proto_server_notification({:component_crashed, room_id, component_id}),
do: {:component_crashed, %ComponentCrashed{room_id: room_id, component_id: component_id}}

defp to_proto_server_notification({:peer_metadata_updated, room_id, peer_id, metadata}),
do:
{:peer_metadata_updated,
%PeerMetadataUpdated{room_id: room_id, peer_id: peer_id, metadata: Jason.encode!(metadata)}}

defp to_proto_server_notification({:track_added, room_id, endpoint_info, track_info}) do
{:track_added,
%TrackAdded{
room_id: room_id,
endpoint_info: endpoint_info,
track: to_proto_track(track_info)
}}
end

defp to_proto_server_notification({:track_removed, room_id, endpoint_info, track_info}) do
{:track_removed,
%TrackRemoved{
room_id: room_id,
endpoint_info: endpoint_info,
track: to_proto_track(track_info)
}}
end

defp to_proto_server_notification({:track_metadata_updated, room_id, endpoint_info, track_id}) do
{:track_metadata_updated,
%TrackMetadataUpdated{
room_id: room_id,
endpoint_info: endpoint_info,
track: to_proto_track(track_id)
}}
end

defp to_proto_server_notification({:hls_playable, room_id, component_id}),
do: {:hls_playable, %HlsPlayable{room_id: room_id, component_id: component_id}}

Expand All @@ -67,4 +106,24 @@ defmodule Jellyfish.Event do

defp to_proto_server_notification({:hls_upload_crashed, room_id}),
do: {:hls_upload_crashed, %HlsUploadCrashed{room_id: room_id}}

defp to_proto_track(%Jellyfish.Track{} = track) do
%Track{
id: track.id,
type: to_proto_track_type(track.type),
metadata: Jason.encode!(track.metadata)
}
end

defp to_proto_track(%Message.TrackAdded{} = track) do
%Track{
id: track.track_id,
type: to_proto_track_type(track.track_type),
metadata: Jason.encode!(track.track_metadata)
}
end

defp to_proto_track_type(:video), do: :TRACK_TYPE_VIDEO
defp to_proto_track_type(:audio), do: :TRACK_TYPE_AUDIO
defp to_proto_track_type(_type), do: :TRACK_TYPE_UNSPECIFIED
end
8 changes: 6 additions & 2 deletions lib/jellyfish/peer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@ defmodule Jellyfish.Peer do
Peer is an entity that connects to the server to publish, subscribe or publish and subscribe to tracks published by
producers or other peers. Peer process is spawned after peer connects to the server.
"""
use Bunch.Access

alias Jellyfish.Peer.WebRTC
alias Jellyfish.Track

@enforce_keys [
:id,
:type,
:engine_endpoint
]
defstruct @enforce_keys ++ [status: :disconnected, socket_pid: nil]
defstruct @enforce_keys ++ [status: :disconnected, socket_pid: nil, tracks: %{}, metadata: nil]

@type id :: String.t()
@type peer :: WebRTC
Expand All @@ -28,7 +30,9 @@ defmodule Jellyfish.Peer do
type: peer(),
status: status(),
socket_pid: pid() | nil,
engine_endpoint: Membrane.ChildrenSpec.child_definition()
engine_endpoint: Membrane.ChildrenSpec.child_definition(),
tracks: %{Track.id() => Track.t()},
metadata: any()
}

@spec parse_type(String.t()) :: {:ok, peer()} | {:error, :invalid_type}
Expand Down
1 change: 0 additions & 1 deletion lib/jellyfish/peer/webrtc.ex
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ defmodule Jellyfish.Peer.WebRTC do
handshake_opts: handshake_options,
filter_codecs: filter_codecs,
log_metadata: [peer_id: options.peer_id],
trace_context: nil,
extensions: %{opus: Membrane.RTP.VAD},
webrtc_extensions: webrtc_extensions,
simulcast_config: %SimulcastConfig{
Expand Down
Loading

0 comments on commit 36e1d00

Please sign in to comment.