dex/vendor/google.golang.org/api/examples/pubsub.go
2016-04-08 11:56:29 -07:00

325 lines
8.7 KiB
Go

package main
import (
"bufio"
"encoding/base64"
"encoding/json"
"fmt"
"log"
"net"
"net/http"
"net/textproto"
"os"
"strings"
pubsub "google.golang.org/api/pubsub/v1beta1"
)
const USAGE = `Available arguments are:
PROJ list_topics
PROJ create_topic TOPIC
PROJ delete_topic TOPIC
PROJ list_subscriptions
PROJ create_subscription SUBSCRIPTION LINKED_TOPIC
PROJ delete_subscription SUBSCRIPTION
PROJ connect_irc TOPIC SERVER CHANNEL
PROJ pull_messages SUBSCRIPTION
`
type IRCBot struct {
server string
port string
nick string
user string
channel string
conn net.Conn
tpReader *textproto.Reader
}
func NewIRCBot(server, channel, nick string) *IRCBot {
return &IRCBot{
server: server,
port: "6667",
nick: nick,
channel: channel,
conn: nil,
user: nick,
}
}
func (bot *IRCBot) Connect() {
conn, err := net.Dial("tcp", bot.server+":"+bot.port)
if err != nil {
log.Fatal("unable to connect to IRC server ", err)
}
bot.conn = conn
log.Printf("Connected to IRC server %s (%s)\n",
bot.server, bot.conn.RemoteAddr())
bot.tpReader = textproto.NewReader(bufio.NewReader(bot.conn))
bot.Sendf("USER %s 8 * :%s\r\n", bot.nick, bot.nick)
bot.Sendf("NICK %s\r\n", bot.nick)
bot.Sendf("JOIN %s\r\n", bot.channel)
}
func (bot *IRCBot) CheckConnection() {
for {
line, err := bot.ReadLine()
if err != nil {
log.Fatal("Unable to read a line during checking the connection.")
}
if strings.Contains(line, "004") {
log.Println("The nick accepted.")
} else if strings.Contains(line, "433") {
log.Fatal("The nick is already in use.")
} else if strings.Contains(line, "366") {
log.Println("Starting to publish messages.")
return
}
}
}
func (bot *IRCBot) Sendf(format string, args ...interface{}) {
fmt.Fprintf(bot.conn, format, args...)
}
func (bot *IRCBot) Close() {
bot.conn.Close()
}
func (bot *IRCBot) ReadLine() (line string, err error) {
return bot.tpReader.ReadLine()
}
func init() {
registerDemo("pubsub", pubsub.PubsubScope, pubsubMain)
}
func pubsubUsage() {
fmt.Fprint(os.Stderr, USAGE)
}
// Returns a fully qualified resource name for Cloud Pub/Sub.
func fqrn(res, proj, name string) string {
return fmt.Sprintf("/%s/%s/%s", res, proj, name)
}
func fullTopicName(proj, topic string) string {
return fqrn("topics", proj, topic)
}
func fullSubscriptionName(proj, topic string) string {
return fqrn("subscriptions", proj, topic)
}
// Check the length of the arguments.
func checkArgs(argv []string, min int) {
if len(argv) < min {
pubsubUsage()
os.Exit(2)
}
}
func listTopics(service *pubsub.Service, argv []string) {
var nextPageToken string = ""
for {
query := service.Topics.List().Query(
fmt.Sprintf(
"cloud.googleapis.com/project in (/projects/%s)",
argv[0])).PageToken(nextPageToken)
topicsList, err := query.Do()
if err != nil {
log.Fatal("Got an error: %v", err)
}
for _, topic := range topicsList.Topic {
fmt.Println(topic.Name)
}
nextPageToken = topicsList.NextPageToken
if nextPageToken == "" {
break
}
}
}
func createTopic(service *pubsub.Service, argv []string) {
checkArgs(argv, 3)
topic := &pubsub.Topic{Name: fullTopicName(argv[0], argv[2])}
topic, err := service.Topics.Create(topic).Do()
if err != nil {
log.Fatal("Got an error: %v", err)
}
fmt.Printf("Topic %s was created.\n", topic.Name)
}
func deleteTopic(service *pubsub.Service, argv []string) {
checkArgs(argv, 3)
topicName := fullTopicName(argv[0], argv[2])
err := service.Topics.Delete(topicName).Do()
if err != nil {
log.Fatal("Got an error: %v", err)
}
fmt.Printf("Topic %s was deleted.\n", topicName)
}
func listSubscriptions(service *pubsub.Service, argv []string) {
var nextPageToken string = ""
for {
query := service.Subscriptions.List().Query(
fmt.Sprintf(
"cloud.googleapis.com/project in (/projects/%s)",
argv[0])).PageToken(nextPageToken)
subscriptionsList, err := query.Do()
if err != nil {
log.Fatal("Got an error: %v", err)
}
for _, subscription := range subscriptionsList.Subscription {
sub_text, _ := json.MarshalIndent(subscription, "", " ")
fmt.Printf("%s\n", sub_text)
}
nextPageToken = subscriptionsList.NextPageToken
if nextPageToken == "" {
break
}
}
}
func createSubscription(service *pubsub.Service, argv []string) {
checkArgs(argv, 4)
subscription := &pubsub.Subscription{
Name: fullSubscriptionName(argv[0], argv[2]),
Topic: fullTopicName(argv[0], argv[3]),
}
subscription, err := service.Subscriptions.Create(subscription).Do()
if err != nil {
log.Fatal("Got an error: %v", err)
}
fmt.Printf("Subscription %s was created.\n", subscription.Name)
}
func deleteSubscription(service *pubsub.Service, argv []string) {
checkArgs(argv, 3)
subscriptionName := fullSubscriptionName(argv[0], argv[2])
err := service.Subscriptions.Delete(subscriptionName).Do()
if err != nil {
log.Fatal("Got an error: %v", err)
}
fmt.Printf("Subscription %s was deleted.\n", subscriptionName)
}
func connectIRC(service *pubsub.Service, argv []string) {
checkArgs(argv, 5)
topicName := fullTopicName(argv[0], argv[2])
server := argv[3]
channel := argv[4]
nick := fmt.Sprintf("bot-%s", argv[2])
ircbot := NewIRCBot(server, channel, nick)
ircbot.Connect()
defer ircbot.Close()
ircbot.CheckConnection()
privMark := fmt.Sprintf("PRIVMSG %s :", ircbot.channel)
for {
line, err := ircbot.ReadLine()
if err != nil {
log.Fatal("Unable to read a line from the connection.")
}
parts := strings.Split(line, " ")
if len(parts) > 0 && parts[0] == "PING" {
ircbot.Sendf("PONG %s\r\n", parts[1])
} else {
pos := strings.Index(line, privMark)
if pos == -1 {
continue
}
privMsg := line[pos+len(privMark) : len(line)]
pubsubMessage := &pubsub.PubsubMessage{
Data: base64.StdEncoding.EncodeToString([]byte(privMsg)),
}
publishRequest := &pubsub.PublishRequest{
Message: pubsubMessage,
Topic: topicName,
}
service.Topics.Publish(publishRequest).Do()
log.Println("Published a message to the topic.")
}
}
}
func pullMessages(service *pubsub.Service, argv []string) {
checkArgs(argv, 3)
subscriptionName := fullSubscriptionName(argv[0], argv[2])
pullRequest := &pubsub.PullRequest{
ReturnImmediately: false,
Subscription: subscriptionName,
}
for {
pullResponse, err := service.Subscriptions.Pull(pullRequest).Do()
if err != nil {
log.Fatal("Got an error while pull a message: %v", err)
}
if pullResponse.PubsubEvent.Message != nil {
data, err := base64.StdEncoding.DecodeString(
pullResponse.PubsubEvent.Message.Data)
if err != nil {
log.Fatal("Got an error while decoding the message: %v", err)
}
fmt.Printf("%s\n", data)
ackRequest := &pubsub.AcknowledgeRequest{
AckId: []string{pullResponse.AckId},
Subscription: subscriptionName,
}
service.Subscriptions.Acknowledge(ackRequest).Do()
}
}
}
// This example demonstrates calling the Cloud Pub/Sub API. As of 20
// Aug 2014, the Cloud Pub/Sub API is only available if you're
// whitelisted. If you're interested in using it, please apply for the
// Limited Preview program at the following form:
// http://goo.gl/Wql9HL
//
// Also, before running this example, be sure to enable Cloud Pub/Sub
// service on your project in Developer Console at:
// https://console.developers.google.com/
//
// It has 8 subcommands as follows:
//
// PROJ list_topics
// PROJ create_topic TOPIC
// PROJ delete_topic TOPIC
// PROJ list_subscriptions
// PROJ create_subscription SUBSCRIPTION LINKED_TOPIC
// PROJ delete_subscription SUBSCRIPTION
// PROJ connect_irc TOPIC SERVER CHANNEL
// PROJ pull_messages SUBSCRIPTION
//
// You can use either of your alphanumerical or numerial Cloud Project
// ID for PROJ. You can choose any names for TOPIC and SUBSCRIPTION as
// long as they follow the naming rule described at:
// https://developers.google.com/pubsub/overview#names
//
// You can list/create/delete topics/subscriptions by self-explanatory
// subcommands, as well as connect to an IRC channel and publish
// messages from the IRC channel to a specified Cloud Pub/Sub topic by
// the "connect_irc" subcommand, or continuously pull messages from a
// specified Cloud Pub/Sub subscription and display the data by the
// "pull_messages" subcommand.
func pubsubMain(client *http.Client, argv []string) {
checkArgs(argv, 2)
service, _ := pubsub.New(client)
m := map[string]func(service *pubsub.Service, argv []string){
"list_topics": listTopics,
"list_subscriptions": listSubscriptions,
"create_topic": createTopic,
"delete_topic": deleteTopic,
"create_subscription": createSubscription,
"delete_subscription": deleteSubscription,
"connect_irc": connectIRC,
"pull_messages": pullMessages,
}
f, ok := m[argv[1]]
if !ok {
pubsubUsage()
os.Exit(2)
}
f(service, argv)
}