Skip to content

Commit

Permalink
start sse
Browse files Browse the repository at this point in the history
  • Loading branch information
porjo committed Jan 19, 2025
1 parent bccc2ef commit 6918b6b
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 65 deletions.
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@ module github.com/porjo/ytdl-web

go 1.23

require github.com/gorilla/websocket v1.5.3
require (
github.com/gorilla/websocket v1.5.3
github.com/tmaxmax/go-sse v0.10.0
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/tmaxmax/go-sse v0.10.0 h1:j9F93WB4Hxt8wUf6oGffMm4dutALvUPoDDxfuDQOSqA=
github.com/tmaxmax/go-sse v0.10.0/go.mod h1:u/2kZQR1tyngo1lKaNCj1mJmhXGZWS1Zs5yiSOD+Eg8=
8 changes: 8 additions & 0 deletions html/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ var seekTimer = null;
var trackId = null;

$(function(){

var sseHost = window.location.protocol + "//" + window.location.host;

const evtSource = new EventSource(sseHost + "/sse");
evtSource.onmessage = (event) => {
console.log("sse event", event);
}

var ws = new WebSocket(ws_uri);

const url = new URL(window.location);
Expand Down
85 changes: 23 additions & 62 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/porjo/ytdl-web/internal/jobs"
"github.com/porjo/ytdl-web/internal/util"
"github.com/porjo/ytdl-web/internal/ytworker"
"github.com/tmaxmax/go-sse"
)

const (
Expand All @@ -35,24 +36,12 @@ const (
DefaultExpiry = 24 * time.Hour
)

var (
upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
)

type Request struct {
URL string
DeleteURLs []string `json:"delete_urls"`
}

type Conn struct {
sync.Mutex
*websocket.Conn
}

type wsHandler struct {
type dlHandler struct {
WebRoot string
OutPath string
RemoteAddr string
Expand All @@ -61,55 +50,24 @@ type wsHandler struct {
Downloader *ytworker.Download

Logger *slog.Logger

SSE *sse.Server
}

func (ws *wsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
func (dl *dlHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

ctx, cancel := context.WithCancel(r.Context())
defer cancel()

gconn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
slog.Error(err.Error())
return
}

id, outCh := ws.Downloader.Subscribe()
defer ws.Downloader.Unsubscribe(id)
id, outCh := dl.Downloader.Subscribe()
defer dl.Downloader.Unsubscribe(id)

ws.RemoteAddr = gconn.RemoteAddr().String()
logger := ws.Logger.With("ws", id)
dl.RemoteAddr = r.RemoteAddr
logger := dl.Logger.With("dl", id)
logger.Info("client connected")

// wrap Gorilla conn with our conn so we can extend functionality
conn := Conn{sync.Mutex{}, gconn}

wg := sync.WaitGroup{}

wg.Add(1)
// setup ping/pong to keep connection open
go func() {
ticker := time.NewTicker(WSPingInterval)
defer ticker.Stop()
defer wg.Done()

for {
select {
case <-ctx.Done():
logger.Info("ping, context done")
return
case <-ticker.C:
//slog.Debug("ping")
// WriteControl can be called concurrently
if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(WSWriteWait)); err != nil {
logger.Error("ping client error", "error", err)
cancel()
return
}
}
}
}()

wg.Add(1)
go func() {
defer wg.Done()
Expand All @@ -123,35 +81,38 @@ func (ws *wsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !open {
return
}
err := conn.writeMsg(m)
if err != nil {
m := util.Msg{Key: "error", Value: err.Error()}
conn.writeMsg(m)
}
sseM := &sse.Message{}
j, _ := m.JSON()
sseM.AppendData(string(j))

if m.Key == ytworker.KeyCompleted {
// on completion, also send recent URLs
gruCtx, _ := context.WithTimeout(ctx, 10*time.Second)
recentURLs, err := GetRecentURLs(gruCtx, ws.WebRoot, ws.OutPath, ws.FFProbeCmd)
recentURLs, err := GetRecentURLs(gruCtx, dl.WebRoot, dl.OutPath, dl.FFProbeCmd)
if err != nil {
logger.Error("GetRecentURLS error", "error", err)
return
}
m := util.Msg{Key: "recent", Value: recentURLs}
conn.writeMsg(m)
j, _ = m.JSON()
sseM.AppendData(string(j))
}
dl.SSE.Publish(sseM)
}
}
}()

sseM := &sse.Message{}
// Send recently retrieved URLs
gruCtx, _ := context.WithTimeout(ctx, 10*time.Second)
recentURLs, err := GetRecentURLs(gruCtx, ws.WebRoot, ws.OutPath, ws.FFProbeCmd)
recentURLs, err := GetRecentURLs(gruCtx, dl.WebRoot, dl.OutPath, dl.FFProbeCmd)
if err != nil {
logger.Error("GetRecentURLS error", "error", err)
return
}
m := util.Msg{Key: "recent", Value: recentURLs}
conn.writeMsg(m)
j, _ = m.JSON()
dl.SSE.Publish(sseM)

wg.Add(1)
go func() {
Expand All @@ -178,7 +139,7 @@ func (ws *wsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

err := ws.msgHandler(req)
err := dl.msgHandler(req)
if err != nil {
logger.Error("msgHandler error", "error", err)
m := util.Msg{Key: "error", Value: err.Error()}
Expand Down Expand Up @@ -211,7 +172,7 @@ func (c *Conn) writeMsg(val interface{}) error {
return nil
}

func (ws *wsHandler) msgHandler(req Request) error {
func (ws *dlHandler) msgHandler(req Request) error {

if req.URL == "" && len(req.DeleteURLs) == 0 {
return fmt.Errorf("unknown parameters")
Expand Down
9 changes: 9 additions & 0 deletions internal/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package util

import (
"context"
"encoding/json"
"log/slog"
"runtime"
)
Expand Down Expand Up @@ -30,3 +31,11 @@ type Msg struct {
Key string
Value interface{}
}

func (m Msg) JSON() ([]byte, error) {
b, err := json.Marshal(m)
if err != nil {
return nil, err
}
return b, nil
}
19 changes: 17 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/porjo/ytdl-web/internal/jobs"
"github.com/porjo/ytdl-web/internal/ytworker"
"github.com/tmaxmax/go-sse"
)

const MaxProcessTime = time.Second * 300
Expand Down Expand Up @@ -72,18 +73,32 @@ func main() {
dispatcher.Start(ctx)
}()

ws := &wsHandler{
s := &sse.Server{}

dlh := &dlHandler{
WebRoot: *webRoot,
OutPath: *outPath,
FFProbeCmd: *ffprobeCmd,
Dispatcher: dispatcher,
Downloader: dl,
Logger: logger,
SSE: s,
}
http.Handle("/websocket", ws)
http.HandleFunc("/dl/stream/", ServeStream(*webRoot))
http.Handle("/", http.FileServer(http.Dir(*webRoot)))

go func() {
m := &sse.Message{}
m.AppendData("Hello world")

for range time.Tick(time.Second) {
_ = s.Publish(m)
}
}()

http.Handle("/sse", s)
http.Handle("/dl", dlh)

slog.Info("starting cleanup routine...")
go fileCleanup(filepath.Join(*webRoot, *outPath), *expiry)

Expand Down

0 comments on commit 6918b6b

Please sign in to comment.