package mastodon import ( "bufio" "bytes" "context" "encoding/json" "errors" "io" "net/http" "net/url" "path" "strings" ) type UpdateEvent struct { Status *Status `json:"status"` } func (e *UpdateEvent) event() {} type NotificationEvent struct { Notification *Notification `json:"notification"` } func (e *NotificationEvent) event() {} type DeleteEvent struct{ ID ID } func (e *DeleteEvent) event() {} type ErrorEvent struct{ err error } func (e *ErrorEvent) event() {} func (e *ErrorEvent) Error() string { return e.err.Error() } type Event interface { event() } func handleReader(q chan Event, r io.Reader) error { var name string var lineBuf bytes.Buffer br := bufio.NewReader(r) for { line, isPrefix, err := br.ReadLine() if err != nil { if errors.Is(err, io.EOF) { return nil } return err } if isPrefix { lineBuf.Write(line) continue } if lineBuf.Len() > 0 { lineBuf.Write(line) line = lineBuf.Bytes() lineBuf.Reset() } token := strings.SplitN(string(line), ":", 2) if len(token) != 2 { continue } switch strings.TrimSpace(token[0]) { case "event": name = strings.TrimSpace(token[1]) case "data": var err error switch name { case "update": var status Status err = json.Unmarshal([]byte(token[1]), &status) if err == nil { q <- &UpdateEvent{&status} } case "notification": var notification Notification err = json.Unmarshal([]byte(token[1]), ¬ification) if err == nil { q <- &NotificationEvent{¬ification} } case "delete": q <- &DeleteEvent{ID: ID(strings.TrimSpace(token[1]))} } if err != nil { q <- &ErrorEvent{err} } } } } func streaming(p string, params url.Values) (chan Event, error) { checkInit() c := client ctx := context.Background() u, err := url.Parse(c.Config.Server) if err != nil { return nil, err } u.Path = path.Join(u.Path, "/api/v1/streaming", p) u.RawQuery = params.Encode() req, err := http.NewRequest(http.MethodGet, u.String(), nil) if err != nil { return nil, err } req = req.WithContext(ctx) if c.Config.AccessToken != "" { req.Header.Set("Authorization", "Bearer "+c.Config.AccessToken) } q := make(chan Event) go func() { defer close(q) for { select { case <-ctx.Done(): return default: } doStreaming(req, q) } }() return q, nil } func doStreaming(req *http.Request, q chan Event) { resp, err := client.Do(req) if err != nil { q <- &ErrorEvent{err} return } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { q <- &ErrorEvent{parseAPIError("bad request", resp)} return } err = handleReader(q, resp.Body) if err != nil { q <- &ErrorEvent{err} } } // StreamingUser returns a channel to read events on home. func StreamingUser() (chan Event, error) { return streaming("user", nil) } // StreamingPublic returns a channel to read events on public. func StreamingPublic(isLocal bool) (chan Event, error) { p := "public" if isLocal { p = path.Join(p, "local") } return streaming(p, nil) } // StreamingHashtag returns a channel to read events on tagged timeline. func StreamingHashtag(tag string, isLocal bool) (chan Event, error) { params := url.Values{} params.Set("tag", tag) p := "hashtag" if isLocal { p = path.Join(p, "local") } return streaming(p, params) } // StreamingList returns a channel to read events on a list. func (l *List) Streaming() (chan Event, error) { params := url.Values{} params.Set("list", l.GetID()) return streaming("list", params) } // StreamingDirect returns a channel to read events on a direct messages. func StreamingDirect() (chan Event, error) { return streaming("direct", nil) }