add streaming

This commit is contained in:
Yasuhiro Matsumoto 2017-04-14 09:32:46 +09:00
parent 106cec241e
commit ae1467926c
2 changed files with 117 additions and 8 deletions

View File

@ -3,16 +3,19 @@ package main
import (
"bufio"
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"log"
"os"
"os/signal"
"path/filepath"
"runtime"
"strings"
"github.com/fatih/color"
"github.com/mattn/go-mastodon"
"github.com/mattn/go-tty"
"golang.org/x/net/html"
@ -20,6 +23,7 @@ import (
var (
toot = flag.String("t", "", "toot text")
stream = flag.Bool("S", false, "streaming public")
)
func extractText(node *html.Node, w *bytes.Buffer) {
@ -38,6 +42,16 @@ func extractText(node *html.Node, w *bytes.Buffer) {
}
}
func textContent(s string) string {
doc, err := html.Parse(strings.NewReader(s))
if err != nil {
log.Fatal(err)
}
var buf bytes.Buffer
extractText(doc, &buf)
return buf.String()
}
func prompt() (string, string, error) {
t, err := tty.Open()
if err != nil {
@ -132,19 +146,39 @@ func main() {
}
return
}
if *stream {
ctx, cancel := context.WithCancel(context.Background())
sc := make(chan os.Signal, 1)
signal.Notify(sc, os.Interrupt)
q, err := client.StreamingPublic(ctx)
if err != nil {
log.Fatal(err)
}
go func() {
<-sc
cancel()
close(q)
}()
for e := range q {
switch t := e.(type) {
case *mastodon.UpdateEvent:
color.Set(color.FgHiRed)
fmt.Println(t.Status.Account.Username)
color.Set(color.Reset)
fmt.Println(textContent(t.Status.Content))
}
}
return
}
timeline, err := client.GetTimelineHome()
if err != nil {
log.Fatal(err)
}
for _, t := range timeline {
doc, err := html.Parse(strings.NewReader(t.Content))
if err != nil {
log.Fatal(err)
}
var buf bytes.Buffer
extractText(doc, &buf)
color.Set(color.FgHiRed)
fmt.Println(t.Account.Username)
fmt.Println(buf.String())
color.Set(color.Reset)
fmt.Println(textContent(t.Content))
}
}

View File

@ -1,7 +1,10 @@
package mastodon
import (
"bufio"
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"path"
@ -228,3 +231,75 @@ func (c *client) PostStatus(toot *Toot) (*Status, error) {
}
return &status, nil
}
type UpdateEvent struct {
Status *Status
}
func (e *UpdateEvent) event() {}
type NotificationEvent struct {
}
func (e *NotificationEvent) event() {}
type DeleteEvent struct {
ID int64
}
func (e *DeleteEvent) event() {}
type Event interface {
event()
}
func (c *client) StreamingPublic(ctx context.Context) (chan Event, error) {
url, err := url.Parse(c.config.Server)
if err != nil {
return nil, err
}
url.Path = path.Join(url.Path, "/api/v1/streaming/public")
req, err := http.NewRequest("GET", url.String(), nil)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+c.config.AccessToken)
resp, err := c.Do(req)
if err != nil {
return nil, err
}
q := make(chan Event)
go func() {
defer ctx.Done()
name := ""
s := bufio.NewScanner(resp.Body)
for s.Scan() {
line := s.Text()
token := strings.SplitN(line, ":", 2)
if len(token) != 2 {
continue
}
switch strings.TrimSpace(token[0]) {
case "event":
name = strings.TrimSpace(token[1])
case "data":
switch name {
case "update":
var status Status
json.Unmarshal([]byte(token[1]), &status)
q <- &UpdateEvent{&status}
case "notification":
case "delete":
}
}
}
fmt.Println(s.Err())
}()
go func() {
<-ctx.Done()
resp.Body.Close()
}()
return q, nil
}