package main import ( "context" "flag" "log" "net/http" "os" "os/signal" "time" "golang.org/x/crypto/blake2b" dev "github.com/vladimirvivien/go4vl/device" "github.com/vladimirvivien/go4vl/v4l2" "github.com/gorilla/websocket" ) func sender(queue chan []byte, secret string, cooldown uint) { log.Println("start sender") h := make(http.Header, 1) h.Set("X-Secret", secret) for { ws, _, err := websocket.DefaultDialer.Dial("wss://yebi.su/ws", h) if err != nil { log.Println("dial error") continue } log.Println("connected") for { data := <-queue if err := ws.WriteMessage(websocket.BinaryMessage, data); err != nil { log.Println("WriteMessage() failed") break } time.Sleep(time.Duration(cooldown) * time.Millisecond) } ws.Close() } } func chunkBy[T any](items []T, chunkSize int) (chunks [][]T) { for chunkSize < len(items) { items, chunks = items[chunkSize:], append(chunks, items[0:chunkSize:chunkSize]) } return chunks } func processor(inQueue chan []byte, outQueue chan []byte) { log.Println("start processor") for { data := <-inQueue newData := []byte{} for _, chunk := range chunkBy(data, 2) { newData = append(newData, chunk[0]^chunk[1]) } data = newData newData = []byte{} for _, chunk := range chunkBy(data, 8) { var b byte = 0 for j := 0; j < 8; j++ { if chunk[j]&1 != 0 { b |= (1 << (7 - j)) } } newData = append(newData, b) } data = newData newData = []byte{} for _, chunk := range chunkBy(data, 32) { sum := blake2b.Sum256(chunk) newData = append(newData, sum[:]...) } outQueue <- newData } } func main() { var deviceName string var bufferSize uint var processorsCount uint var sendersCount uint var sendCooldown uint var ident string var secret string flag.StringVar(&deviceName, "device", "/dev/video0", "v4l2 device to capture from") flag.UintVar(&bufferSize, "bufferSize", 1024, "sending queue max capacity") flag.UintVar(&processorsCount, "processorsCount", 1, "count of processor goroutines") flag.UintVar(&sendersCount, "sendersCount", 1, "count of sender goroutines") flag.UintVar(&sendCooldown, "sendCooldown", 100, "send cooldown (in ms)") flag.StringVar(&ident, "ident", "", "yebi.su ident") flag.StringVar(&secret, "secret", "", "yebi.su secret") flag.Parse() if ident == "" { log.Panicln("ident not specified") } if secret == "" { log.Panicln("secret not specified") } secret = ident + " " + secret log.Printf("open v4l2 device: %s\n", deviceName) device, err := dev.Open(deviceName) if err != nil { log.Panicln("failed to open device") } defer device.Close() frameSizes, err := v4l2.GetFormatFrameSizes(device.Fd(), v4l2.PixelFmtYUYV) if err != nil { log.Panicln("failed to get supported frame sizes") } if err := device.SetPixFormat(v4l2.PixFormat{ Width: frameSizes[0].Size.MinWidth, Height: frameSizes[0].Size.MinHeight, PixelFormat: v4l2.PixelFmtYUYV, Field: v4l2.FieldNone, }); err != nil { log.Panicln("failed to set device pixel format") } processorQueue := make(chan []byte, bufferSize) senderQueue := make(chan []byte, bufferSize) defer close(processorQueue) defer close(senderQueue) for i := 0; i < int(sendersCount); i++ { go sender(senderQueue, secret, sendCooldown) } for i := 0; i < int(processorsCount); i++ { go processor(processorQueue, senderQueue) } log.Println("start recording") ctx, stop := context.WithCancel(context.TODO()) if err := device.Start(ctx); err != nil { log.Panicln("failed to start recording on device") } defer stop() log.Println("start loop") interrupt := make(chan os.Signal, 1) signal.Notify(interrupt, os.Interrupt) for { select { case frame := <-device.GetOutput(): if len(frame) != 0 { processorQueue <- frame } case <-interrupt: log.Println("interrupted") return } } }