...
 
Commits (23)
......@@ -14,5 +14,11 @@
# Dependency directories (remove the comment below to include it)
# vendor/
/jetstats*
/*.db
# Ignore env files
.env
# Ignore build binary
jetstats
# Ignore db files
*.db
---
linters:
enable:
- bodyclose
......@@ -6,23 +7,19 @@ linters:
- dogsled
- dupl
- errcheck
- funlen
- gocognit
- goconst
- gocritic
- gocyclo
- godox
- gofmt
- goimports
- golint
- gomnd
- goprintffuncname
- gosec
- gosimple
- govet
- ineffassign
- interfacer
- lll
- maligned
- misspell
- nakedret
......
.DEFAULT_GOAL := run
.DEFAULT_GOAL := build
SHELL := bash
install:
......@@ -7,11 +7,11 @@ install:
build:
go build -o jetstats
run:
go run .
lint:
golangci-lint --color always run
golangci-lint --color always run --fix
test:
go test -v ./...
e2e:
source e2e.sh && go test -v ./...
package app
import (
"git.oolsa.net/jooola/jetstats/app/mailjet"
"git.oolsa.net/jooola/jetstats/sender"
"git.oolsa.net/jooola/jetstats/store"
"github.com/gorilla/mux"
influxdb "github.com/influxdata/influxdb-client-go"
mailjet "github.com/mailjet/mailjet-apiv3-go"
cron "github.com/robfig/cron/v3"
bolt "go.etcd.io/bbolt"
)
// App is a mailstats application
// App is a jetstats application
type App struct {
config *Config
store *bolt.DB
router *mux.Router
cron *cron.Cron
sending bool
config *Config
router *mux.Router
mailjet *mailjet.Mailjet
mailjetClient *mailjet.Client
influxdbClient influxdb.Client
store store.Store
sender sender.Sender
}
// NewApp creates a new app and initiate its components
......@@ -26,21 +23,17 @@ func NewApp(config *Config) *App {
a := &App{}
a.config = config
a.store = NewStore(a)
a.mailjetClient = NewMailjetClient(a)
a.influxdbClient = NewInfluxdbClient(a)
a.mailjet = mailjet.New()
a.store = store.New("boltdb")
a.sender = sender.New("influxdb")
return a
}
// NewServer creates a server from a new app
func NewServer(config *Config) *App {
config.AllowWrite = true
a := NewApp(config)
a.router = NewRouter(a)
a.cron = NewCron(a)
a.cron.Start()
return a
}
......@@ -48,5 +41,5 @@ func NewServer(config *Config) *App {
// Close closes all application components
func (a *App) Close() {
a.store.Close()
a.influxdbClient.Close()
a.sender.Close()
}
......@@ -4,38 +4,26 @@ import (
"log"
"net/url"
"github.com/jinzhu/configor"
env "github.com/caarlos0/env/v6"
)
// Config defines the application configuration
type Config struct {
ListenAddr string `default:"127.0.0.1:7000" yaml:"listen_addr"`
PublicURL string `required:"true" yaml:"public_url"`
ListenAddr string `env:"LISTEN_ADDR" envDefault:"127.0.0.1:3000"`
PublicURL string `env:"PUBLIC_URL,required"`
publicURL *url.URL
SendInterval string `default:"1h" yaml:"send_interval"`
Verbose bool `default:"false" yaml:"verbose"`
Debug bool `default:"false" yaml:"debug"`
AllowWrite bool
Verbose bool `env:"VERBOSE" envDefault:"false"`
Debug bool `env:"DEBUG" envDefault:"false"`
Database string `default:"data.db" yaml:"database"`
WebhookUser string `required:"true" yaml:"webhook_user"`
WebhookPass string `required:"true" yaml:"webhook_pass"`
MailjetUser string `required:"true" yaml:"mailjet_user"`
MailjetPass string `required:"true" yaml:"mailjet_pass"`
InfluxdbAddr string `required:"true" yaml:"influxdb_addr"`
InfluxdbToken string `required:"true" yaml:"influxdb_token"`
InfluxdbOrg string `required:"true" yaml:"influxdb_org"`
InfluxdbBucket string `required:"true" yaml:"influxdb_bucket"`
WebhookUser string `env:"WEBHOOK_USER,required"`
WebhookPass string `env:"WEBHOOK_PASS,required"`
}
// NewConfig return a new configuration from path
func NewConfig(path string) *Config {
// NewConfig return a new configuration from environement
func NewConfig() *Config {
c := Config{}
if err := configor.Load(&c, path); err != nil {
if err := env.Parse(&c); err != nil {
log.Fatalln(err)
}
......
package app
import (
"io/ioutil"
"log"
)
func newTestConfig() *Config {
tmpfile, err := ioutil.TempFile("", "jetstats.*.db")
if err != nil {
log.Fatalln(err)
}
return &Config{
AllowWrite: true,
PublicURL: "http://127.0.0.1:7000",
Database: tmpfile.Name(),
WebhookUser: "user",
WebhookPass: "pass",
MailjetUser: "user",
MailjetPass: "pass",
InfluxdbAddr: "http://127.0.0.1:9999",
InfluxdbToken: "pass",
}
}
package app
import (
"fmt"
"log"
cron "github.com/robfig/cron/v3"
)
// NewCron create a new cron component
func NewCron(a *App) *cron.Cron {
c := cron.New()
log.Printf("cron will send points every %s\n", a.config.SendInterval)
if _, err := c.AddFunc(fmt.Sprintf("@every %s", a.config.SendInterval), func() {
if !a.sending {
a.sending = true
if err := a.Send(); err != nil {
log.Println(err)
}
a.sending = false
}
}); err != nil {
log.Fatalln(err)
}
return c
}
package event
import (
"testing"
)
func TestBlockedEvent(t *testing.T) {
event := testEventParsing(t, BlockedEventType)
tags, fields := event.Point()
for _, test := range []struct {
in interface{}
out interface{}
}{
{fields["email"], "blocked@mailjet.com"},
{tags["mj_message_id"], "13792286917004336"},
{tags["mj_campaign_id"], "380"},
{tags["mj_contact_id"], "19092"},
{tags["error_related_to"], "recipient"},
{tags["error"], "user unknown"},
} {
if test.in != test.out {
t.Errorf("%s != %s", test.in, test.out)
}
}
}
package event
import (
"testing"
)
func TestBounceEvent(t *testing.T) {
event := testEventParsing(t, BounceEventType)
tags, fields := event.Point()
for _, test := range []struct {
in interface{}
out interface{}
}{
{fields["email"], "bounce@mailjet.com"},
{tags["mj_message_id"], "13792286917004336"},
{tags["mj_campaign_id"], "2"},
{tags["mj_contact_id"], "199"},
{tags["error_related_to"], "recipient"},
{tags["error"], "user unknown"},
{tags["blocked"], "false"},
{tags["hard_bounce"], "true"},
{tags["comment"], "Host or domain name not found: Host not found"},
} {
if test.in != test.out {
t.Errorf("%s != %s", test.in, test.out)
}
}
}
package event
// ClickEvent define a click event occurred on mailjet side
type ClickEvent struct {
OpenEvent
URL string `json:"url"` // the link that was clicked
}
// Point return all tags and fields for influxdb
func (e *ClickEvent) Point() (map[string]string, map[string]interface{}) {
tags, fields := e.OpenEvent.Point()
tags["url"] = e.URL
return tags, fields
}
// Bytes convert the event to a json body
func (e *ClickEvent) Bytes() []byte {
return Bytes(e)
}
package event
import (
"testing"
)
func TestClickEvent(t *testing.T) {
event := testEventParsing(t, ClickEventType)
tags, fields := event.Point()
for _, test := range []struct {
in interface{}
out interface{}
}{
{fields["email"], "api@mailjet.com"},
{tags["mj_message_id"], "19421777836302490"},
{tags["mj_campaign_id"], "7272"},
{tags["mj_contact_id"], "4"},
{tags["ip"], "127.0.0.1"},
{tags["geo"], "FR"},
{tags["user_agent"], "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_0) AppleWebKit/537.36"},
{tags["url"], "https://mailjet.com"},
} {
if test.in != test.out {
t.Errorf("%s != %s", test.in, test.out)
}
}
}
package event
import (
"testing"
)
func TestOpenEvent(t *testing.T) {
event := testEventParsing(t, OpenEventType)
tags, fields := event.Point()
for _, test := range []struct {
in interface{}
out interface{}
}{
{fields["email"], "api@mailjet.com"},
{tags["mj_message_id"], "19421777396190490"},
{tags["mj_campaign_id"], "7173"},
{tags["mj_contact_id"], "320"},
{tags["ip"], "127.0.0.1"},
{tags["geo"], "US"},
{tags["user_agent"], "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko Firefox/11.0"},
} {
if test.in != test.out {
t.Errorf("%s != %s", test.in, test.out)
}
}
}
package event
// SentEvent define a sent event occurred on mailjet side
type SentEvent struct {
baseEvent
SMTPReply string `json:"smtp_reply"` // The raw SMTP response message
}
// Point return all tags and fields for influxdb
func (e *SentEvent) Point() (map[string]string, map[string]interface{}) {
tags, fields := e.baseEvent.Point()
tags["smtp_reply"] = e.SMTPReply
return tags, fields
}
// Bytes convert the event to a json body
func (e *SentEvent) Bytes() []byte {
return Bytes(e)
}
package event
import (
"testing"
)
func TestSentEvent(t *testing.T) {
event := testEventParsing(t, SentEventType)
tags, fields := event.Point()
for _, test := range []struct {
in interface{}
out interface{}
}{
{fields["email"], "api@mailjet.com"},
{tags["mj_message_id"], "19421777835146490"},
{tags["mj_campaign_id"], "7257"},
{tags["mj_contact_id"], "4"},
{tags["smtp_reply"], "sent (250 2.0.0 OK 1433333948 fa5si855896wjc.199 - gsmtp)"},
} {
if test.in != test.out {
t.Errorf("%s != %s", test.in, test.out)
}
}
}
package event
import (
"testing"
)
func TestSpamEvent(t *testing.T) {
event := testEventParsing(t, SpamEventType)
tags, fields := event.Point()
for _, test := range []struct {
in interface{}
out interface{}
}{
{fields["email"], "bounce@mailjet.com"},
{tags["mj_message_id"], "13792286917004336"},
{tags["mj_campaign_id"], "1898"},
{tags["mj_contact_id"], "10920"},
{tags["source"], "JMRPP"},
} {
if test.in != test.out {
t.Errorf("%s != %s", test.in, test.out)
}
}
}
package event
import "fmt"
// UnsubEvent define a unsub event occurred on mailjet side
type UnsubEvent struct {
OpenEvent
MailjetListID uint64 `json:"mj_list_id"` // internal Mailjet list ID associated to the contact
}
// Point return all tags and fields for influxdb
func (e *UnsubEvent) Point() (map[string]string, map[string]interface{}) {
tags, fields := e.OpenEvent.Point()
tags["mj_list_id"] = fmt.Sprintf("%d", e.MailjetListID)
return tags, fields
}
// Bytes convert the event to a json body
func (e *UnsubEvent) Bytes() []byte {
return Bytes(e)
}
package event
import (
"testing"
)
func TestUnsubEvent(t *testing.T) {
event := testEventParsing(t, UnsubEventType)
tags, fields := event.Point()
for _, test := range []struct {
in interface{}
out interface{}
}{
{fields["email"], "api@mailjet.com"},
{tags["mj_message_id"], "20547674933128000"},
{tags["mj_campaign_id"], "7276"},
{tags["mj_contact_id"], "126"},
{tags["ip"], "127.0.0.1"},
{tags["geo"], "FR"},
{tags["user_agent"], "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_0)"},
{tags["mj_list_id"], "1"},
} {
if test.in != test.out {
t.Errorf("%s != %s", test.in, test.out)
}
}
}
......@@ -4,7 +4,7 @@ import (
"log"
"net/http"
"git.oolsa.net/jooola/jetstats/app/event"
"git.oolsa.net/jooola/jetstats/events"
"github.com/gorilla/mux"
)
......@@ -20,56 +20,59 @@ func InitWebhooksHandlers(a *App, r *mux.Router) {
r.HandleFunc("/sent", a.WebhookSentHandler).Methods(http.MethodPost)
}
func (a *App) webhookHandler(w http.ResponseWriter, r *http.Request, evt event.Type) {
ev, err := event.Parse(evt, r.Body)
func (a *App) webhookHandler(w http.ResponseWriter, r *http.Request, eventType string) {
event, err := events.Parse(eventType, r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
log.Fatalln(err)
log.Fatal(err)
return
}
if err := a.SaveEvent(evt, ev); err != nil {
if err := a.store.Save(event); err != nil {
w.WriteHeader(http.StatusInternalServerError)
log.Fatalln(err)
log.Fatal(err)
return
}
// Send to remote storage.
a.sender.Send(event)
w.WriteHeader(http.StatusOK)
}
// WebhookOpenHandler writes data from the mail provider to the database
func (a *App) WebhookOpenHandler(w http.ResponseWriter, r *http.Request) {
a.webhookHandler(w, r, event.OpenEventType)
a.webhookHandler(w, r, events.OpenType)
}
// WebhookClickHandler writes data from the mail provider to the database
func (a *App) WebhookClickHandler(w http.ResponseWriter, r *http.Request) {
a.webhookHandler(w, r, event.ClickEventType)
a.webhookHandler(w, r, events.ClickType)
}
// WebhookBounceHandler writes data from the mail provider to the database
func (a *App) WebhookBounceHandler(w http.ResponseWriter, r *http.Request) {
a.webhookHandler(w, r, event.BounceEventType)
a.webhookHandler(w, r, events.BounceType)
}
// WebhookSpamHandler writes data from the mail provider to the database
func (a *App) WebhookSpamHandler(w http.ResponseWriter, r *http.Request) {
a.webhookHandler(w, r, event.SpamEventType)
a.webhookHandler(w, r, events.SpamType)
}
// WebhookBlockedHandler writes data from the mail provider to the database
func (a *App) WebhookBlockedHandler(w http.ResponseWriter, r *http.Request) {
a.webhookHandler(w, r, event.BlockedEventType)
a.webhookHandler(w, r, events.BlockedType)
}
// WebhookUnsubHandler writes data from the mail provider to the database
func (a *App) WebhookUnsubHandler(w http.ResponseWriter, r *http.Request) {
a.webhookHandler(w, r, event.UnsubEventType)
a.webhookHandler(w, r, events.UnsubType)
}
// WebhookSentHandler writes data from the mail provider to the database
func (a *App) WebhookSentHandler(w http.ResponseWriter, r *http.Request) {
a.webhookHandler(w, r, event.SentEventType)
a.webhookHandler(w, r, events.SentType)
}
package app
import (
influxdb "github.com/influxdata/influxdb-client-go"
)
// NewInfluxdbClient create a new influx db client
func NewInfluxdbClient(a *App) influxdb.Client {
return influxdb.NewClient(a.config.InfluxdbAddr, a.config.InfluxdbToken)
}
package mailjet
import (
"log"
env "github.com/caarlos0/env/v6"
mailjet "github.com/mailjet/mailjet-apiv3-go"
)
// Mailjet client
type Mailjet struct {
*mailjet.Client
config *Config
}
// Config for the mailjet client
type Config struct {
MailjetUser string `env:"MAILJET_USER,required"`
MailjetPass string `env:"MAILJET_PASS,required"`
}
// NewConfig return a configuration from environement
func NewConfig() *Config {
config := &Config{}
if err := env.Parse(config); err != nil {
log.Fatal(err)
}
return config
}
// New create a new mailjet client
func New() *Mailjet {
config := NewConfig()
client := mailjet.NewMailjetClient(config.MailjetUser, config.MailjetPass)
return &Mailjet{client, config}
}
package app
package mailjet
import (
mailjet "github.com/mailjet/mailjet-apiv3-go"
"github.com/mailjet/mailjet-apiv3-go/resources"
)
// NewMailjetClient check whether the mailjet hooks are setup and valid
func NewMailjetClient(a *App) *mailjet.Client {
return mailjet.NewMailjetClient(a.config.MailjetUser, a.config.MailjetPass)
}
func (a *App) mailjetListWebhooks() ([]resources.Eventcallbackurl, error) {
// ListWebhooks return a webhooks list
func (m *Mailjet) ListWebhooks() ([]resources.Eventcallbackurl, error) {
var data []resources.Eventcallbackurl
_, _, err := a.mailjetClient.List("eventcallbackurl", &data)
_, _, err := m.List("eventcallbackurl", &data)
if err != nil {
return nil, err
}
......@@ -21,9 +17,10 @@ func (a *App) mailjetListWebhooks() ([]resources.Eventcallbackurl, error) {
return data, nil
}
func (a *App) mailjetDeleteWebhooks(webhooks []resources.Eventcallbackurl) error {
// DeleteWebhooks delete a webhooks list
func (m *Mailjet) DeleteWebhooks(webhooks []resources.Eventcallbackurl) error {
for _, webhook := range webhooks {
err := a.mailjetClient.Delete(&mailjet.Request{
err := m.Delete(&mailjet.Request{
Resource: "eventcallbackurl",
ID: webhook.ID,
})
......@@ -35,8 +32,9 @@ func (a *App) mailjetDeleteWebhooks(webhooks []resources.Eventcallbackurl) error
return nil
}
func (a *App) mailjetCreateWebhook(payload resources.Eventcallbackurl) error {
return a.mailjetClient.Post(&mailjet.FullRequest{
// CreateWebhook create a webhook
func (m *Mailjet) CreateWebhook(payload resources.Eventcallbackurl) error {
return m.Post(&mailjet.FullRequest{
Info: &mailjet.Request{Resource: "eventcallbackurl"},
Payload: &payload,
}, nil)
......
package app
import (
"log"
"github.com/gorilla/mux"
)
// NewRouter creates a router
func NewRouter(a *App) *mux.Router {
r := mux.NewRouter()
log.Println("creating router with public url '" + a.config.PublicURL + "'")
base := r.PathPrefix(a.config.publicURL.Path).Subrouter()
webhooks := base.PathPrefix("/webhooks").Subrouter()
webhooks.Use(a.WebhookAuthMiddleware)
InitWebhooksHandlers(a, webhooks)
return r
}
package app
import (
"bytes"
"context"
"errors"
"log"
"time"
"git.oolsa.net/jooola/jetstats/app/event"
influxdb "github.com/influxdata/influxdb-client-go"
bolt "go.etcd.io/bbolt"
)
// Send saved data to influxdb
func (a *App) Send() error {
ready, err := a.influxdbClient.Ready(context.Background())
if err != nil {
return err
}
if !ready {
return errors.New("influxdb server not ready")
}
if a.config.Verbose {
log.Println("reseting bucket")
}
d := a.influxdbClient.DeleteApi()
if err := d.DeleteWithName(
context.Background(),
a.config.InfluxdbOrg,
a.config.InfluxdbBucket,
time.Time{},
time.Now(),
"",
); err != nil {
return err
}
err = a.store.View(func(tx *bolt.Tx) error {
eventsBucket := tx.Bucket(eventsBucketKey)
c := eventsBucket.Cursor()
w := a.influxdbClient.WriteApi(a.config.InfluxdbOrg, a.config.InfluxdbBucket)
for k, v := c.First(); k != nil; k, v = c.Next() {
evt, _ := event.ParseKey(k)
ev, err := event.Parse(evt, bytes.NewReader(v))
if err != nil {
return err
}
tags, fields := ev.Point()
p := influxdb.NewPoint(string(evt), tags, fields, ev.GetTime())
if a.config.Debug {
log.Printf("writing point: %s", ev.String())
}
w.WritePoint(p)
}
w.Flush()
return nil
})
if err != nil {
return err
}
return nil
}
......@@ -7,6 +7,8 @@ import (
"os"
"os/signal"
"time"
"github.com/gorilla/mux"
)
// Timeout for http server in seconds
......@@ -43,3 +45,17 @@ func (a *App) Serve() {
log.Fatalln(err)
}
}
// NewRouter creates a router
func NewRouter(a *App) *mux.Router {
r := mux.NewRouter()
log.Println("creating router with public url '" + a.config.PublicURL + "'")
base := r.PathPrefix(a.config.publicURL.Path).Subrouter()
webhooks := base.PathPrefix("/webhooks").Subrouter()
webhooks.Use(a.WebhookAuthMiddleware)
InitWebhooksHandlers(a, webhooks)
return r
}
package app
import (
"log"
"git.oolsa.net/jooola/jetstats/app/event"
bolt "go.etcd.io/bbolt"
)
var (
eventsBucketKey = []byte("events")
)
// NewStore creates a store
func NewStore(a *App) *bolt.DB {
ReadOnly := !a.config.AllowWrite
if a.config.Verbose {
log.Printf("opening database=%s readonly=%t\n", a.config.Database, ReadOnly)
}
db, err := bolt.Open(a.config.Database, 0600, &bolt.Options{ReadOnly: ReadOnly})
if err != nil {
log.Fatalln(err)
}
if ReadOnly {
return db
}
if err := db.Update(func(tx *bolt.Tx) error {
if _, err := tx.CreateBucketIfNotExists(eventsBucketKey); err != nil {
return err
}
return nil
}); err != nil {
log.Fatalln(err)
}
return db
}
// SaveEvent store the event and add it to the queue
func (a *App) SaveEvent(evt event.Type, ev event.Event) error {
return a.store.Update(func(tx *bolt.Tx) error {
eventsBucket := tx.Bucket(eventsBucketKey)
if err := eventsBucket.Put(ev.Key(), ev.Bytes()); err != nil {
return err
}
if a.config.Debug {
go func(ev event.Event) {
log.Printf("saving event: %s", ev.String())
}(ev)
}
return nil
})
}
package app
import (
"bytes"
"log"
"os"
"testing"
"git.oolsa.net/jooola/jetstats/app/event"
bolt "go.etcd.io/bbolt"
)
func TestStore(t *testing.T) {
a := NewApp(newTestConfig())
file, err := os.Open("event/fixtures/click.json")
if err != nil {
log.Fatalln(err)
}
ev, err := event.Parse(event.ClickEventType, file)
if err != nil {
t.Error(err)
}
if err := a.SaveEvent(event.ClickEventType, ev); err != nil {
t.Error(err)
}
err = a.store.View(func(tx *bolt.Tx) error {
eventsBucket := tx.Bucket(eventsBucketKey)
data := eventsBucket.Get(ev.Key())
if data == nil {
t.Fail()
}
ev2, err := event.Parse(event.ClickEventType, bytes.NewReader(data))
if err != nil {
t.Error(err)
}
if string(ev.Bytes()) != string(ev2.Bytes()) {
t.Fail()
}
return nil
})
if err != nil {
t.Error(err)
}
}
......@@ -5,41 +5,47 @@ import (
"net/url"
"path"
"git.oolsa.net/jooola/jetstats/app/event"
"git.oolsa.net/jooola/jetstats/events"
"github.com/mailjet/mailjet-apiv3-go/resources"
)
func (a *App) createWebhookURL(evt event.Type) string {
u := *a.config.publicURL
u.Path = path.Join(u.Path, "webhooks", string(evt))
u.User = url.UserPassword(a.config.WebhookUser, a.config.WebhookPass)
// CreateWebhookURL return a url string based on the event type
func CreateWebhookURL(config *Config, eventType string) string {
u := *config.publicURL
u.Path = path.Join(u.Path, "webhooks", eventType)
u.User = url.UserPassword(config.WebhookUser, config.WebhookPass)
return u.String()
}
// CreateWebhookURL return a url string based on the event type
func (a *App) CreateWebhookURL(eventType string) string {
return CreateWebhookURL(a.config, eventType)
}
// WebhooksStatus check if mailjet webhooks are valid and all set
func (a *App) WebhooksStatus() ([]resources.Eventcallbackurl, error) {
webhooks, err := a.mailjetListWebhooks()
webhooks, err := a.mailjet.ListWebhooks()
if err != nil {
return nil, err
}
var validWebhooks []resources.Eventcallbackurl
for _, evt := range event.Types {
for _, eventType := range events.Types {
for _, webhook := range webhooks {
if string(evt) == webhook.EventType {
if eventType == webhook.EventType {
if !webhook.IsBackup &&
webhook.Status == "alive" &&
webhook.URL == a.createWebhookURL(evt) {
webhook.URL == a.CreateWebhookURL(eventType) {
validWebhooks = append(validWebhooks, webhook)
}
}
}
}
if len(validWebhooks) != len(event.Types) {
if len(validWebhooks) != len(events.Types) {
return validWebhooks, errors.New("mailjet webhooks are not set or not valid")
}
......@@ -52,11 +58,11 @@ func (a *App) WebhooksSetup() error {
return err
}
for _, evt := range event.Types {
err := a.mailjetCreateWebhook(resources.Eventcallbackurl{
EventType: string(evt),
for _, eventType := range events.Types {
err := a.mailjet.CreateWebhook(resources.Eventcallbackurl{
EventType: eventType,
Status: "alive",
URL: a.createWebhookURL(evt),
URL: a.CreateWebhookURL(eventType),
})
if err != nil {
return err
......@@ -68,12 +74,12 @@ func (a *App) WebhooksSetup() error {
// WebhooksDelete delete old mailjet webhooks
func (a *App) WebhooksDelete() error {
webhooks, err := a.mailjetListWebhooks()
webhooks, err := a.mailjet.ListWebhooks()
if err != nil {
return err
}
if err := a.mailjetDeleteWebhooks(webhooks); err != nil {
if err := a.mailjet.DeleteWebhooks(webhooks); err != nil {
return err
}
......
package cmd
import (
"bytes"
"fmt"
"log"
"net/http"
"git.oolsa.net/jooola/jetstats/app"
"git.oolsa.net/jooola/jetstats/events"
"git.oolsa.net/jooola/jetstats/x"
"github.com/spf13/cobra"
)
var mockCmd = &cobra.Command{
Use: "mock",
Short: "Mocking utilities",
}
func init() {
mockCmd.AddCommand(mockSendEvents)
mockSendEvents.Flags().IntVarP(&eventCount, "count", "n", 3, "Number of events to sent")
}
var (
eventCount int
)
var mockSendEvents = &cobra.Command{
Use: "events",
Short: "Send fake events to the server",
Run: func(cmd *cobra.Command, args []string) {
config := app.NewConfig()
counts := make(map[string]int, len(events.Types))
for _, t := range events.Types {
counts[t] = 0
}
for _, event := range x.RandomEvents(eventCount) {
if _, err := http.Post(
app.CreateWebhookURL(config, event.GetType()),
"application/json",
bytes.NewReader(event.Bytes()),
); err != nil {
log.Fatal(err)
}
counts[event.GetType()]++
}
for eventType, count := range counts {
fmt.Printf("created %d %s events\n", count, eventType)
}
},
}
package cmd
import (
"log"
"os"
"runtime"
"runtime/pprof"
"github.com/spf13/cobra"
)
......@@ -14,49 +9,13 @@ var rootCmd = &cobra.Command{
Short: "Listen for mailjet events and send them to influxdb",
}
var (
configPath string
profileCPU string
profileMem string
)
func init() {
rootCmd.PersistentFlags().StringVarP(&configPath, "config", "c", "jetstats.yml", "configuration `file`")
rootCmd.PersistentFlags().StringVar(&profileCPU, "cpuprofile", "", "write cpu profile to `file`")
rootCmd.PersistentFlags().StringVar(&profileMem, "memprofile", "", "write memory profile to `file`")
rootCmd.AddCommand(serverCmd)
rootCmd.AddCommand(webhooksCmd)
rootCmd.AddCommand(sendCmd)
rootCmd.AddCommand(mockCmd)
}
// Execute root command
func Execute() error {
if profileCPU != "" {
f, err := os.Create(profileCPU)
if err != nil {
log.Fatal("could not create CPU profile: ", err)
}
defer f.Close()
if err := pprof.StartCPUProfile(f); err != nil {
log.Fatal("could not start CPU profile: ", err)
}
defer pprof.StopCPUProfile()
}
err := rootCmd.Execute()
if profileMem != "" {
f, err := os.Create(profileMem)
if err != nil {
log.Fatal("could not create memory profile: ", err)
}
defer f.Close()
runtime.GC()
if err := pprof.WriteHeapProfile(f); err != nil {
log.Fatal("could not write memory profile: ", err)
}
}
return err
return rootCmd.Execute()
}
package cmd
import (
"log"
"git.oolsa.net/jooola/jetstats/app"
"github.com/spf13/cobra"
)
var sendCmd = &cobra.Command{
Use: "send",
Short: "Flush influxdb bucket and send all saved points",
Run: func(cmd *cobra.Command, args []string) {
a := app.NewApp(app.NewConfig(configPath))
if err := a.Send(); err != nil {
log.Println(err)
}
defer a.Close()
},
}
......@@ -10,10 +10,9 @@ var serverCmd = &cobra.Command{
Use: "server",
Short: "Listen for mailjet events using webhooks",
Run: func(cmd *cobra.Command, args []string) {
a := app.NewServer(app.NewConfig(configPath))
a := app.NewServer(app.NewConfig())
a.Serve()
defer a.Close()
},
}
......@@ -24,7 +24,7 @@ var webhooksStatusCmd = &cobra.Command{
Use: "status",
Short: "Show mailjet webhooks status",
Run: func(cmd *cobra.Command, args []string) {
a := app.NewApp(app.NewConfig(configPath))
a := app.NewApp(app.NewConfig())
webhooks, err := a.WebhooksStatus()
for _, webhook := range webhooks {
......@@ -42,7 +42,7 @@ var webhooksSetupCmd = &cobra.Command{
Use: "setup",
Short: "Setup mailjet webhooks",
Run: func(cmd *cobra.Command, args []string) {
a := app.NewApp(app.NewConfig(configPath))
a := app.NewApp(app.NewConfig())
if err := a.WebhooksSetup(); err != nil {
log.Fatalln(err)
......@@ -56,7 +56,7 @@ var webhooksDeleteCmd = &cobra.Command{
Use: "delete",
Short: "Delete mailjet webhooks",
Run: func(cmd *cobra.Command, args []string) {
a := app.NewApp(app.NewConfig(configPath))
a := app.NewApp(app.NewConfig())
if err := a.WebhooksDelete(); err != nil {
log.Fatalln(err)
......
#!/usr/bin/env bash
set -eu
error() {
echo >&2 "$*"
exit 1
}
command -v docker > /dev/null || error "docker command not found!"
source setup.sh
# shellcheck disable=SC1091
source .env
# TODO: Move bucket creation to golang tests
typeset -r tmp_bucket="$bucket-e2e-$(date '+%s')"
docker exec -it "$container_name" influx bucket create \
--org "$org" \
--name "$tmp_bucket" > /dev/null
# Minimal influxdb config
export E2E="true"
export INFLUXDB_ADDR
export INFLUXDB_TOKEN
export INFLUXDB_ORG
export INFLUXDB_BUCKET="$tmp_bucket"
package event
package events
import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"strings"
"time"
"github.com/google/uuid"
)
// Type event
type Type string
// Event type enum
const (
OpenEventType Type = "open"
ClickEventType Type = "click"
BounceEventType Type = "bounce"
SpamEventType Type = "spam"
BlockedEventType Type = "blocked"
UnsubEventType Type = "unsub"
SentEventType Type = "sent"
)
// Types defines all events types
var Types = []Type{
SentEventType,
OpenEventType,
ClickEventType,
UnsubEventType,
BounceEventType,
BlockedEventType,
SpamEventType,
}
func Parse(t Type, data io.Reader) (Event, error) {
var event Event
switch t {
case OpenEventType:
event = &OpenEvent{}
case ClickEventType:
event = &ClickEvent{}
case BounceEventType:
event = &BounceEvent{}
case SpamEventType:
event = &SpamEvent{}
case BlockedEventType:
event = &BlockedEvent{}
case UnsubEventType:
event = &UnsubEvent{}
case SentEventType:
event = &SentEvent{}
}
decoder := json.NewDecoder(data)
err := decoder.Decode(&event)
if err != nil {
return nil, err
}
event.SetUUID()
return event, nil
}
// Event interface for all events
type Event interface {
SetUUID()
GetUUID() string
GetTime() time.Time
Key() []byte
Bytes() []byte
String() string
Point() (map[string]string, map[string]interface{})
}
type baseEvent struct {
// Base define a base for all other events, should not be used
type Base struct {
UUID string `json:"uuid"`
Event string `json:"event"` // the event type
......@@ -98,60 +25,34 @@ type baseEvent struct {
CustomID string `json:"CustomID"` // the event payload, when provided at send time
}
// Key of the event
func (e *baseEvent) Key() []byte {
return []byte(fmt.Sprintf("%s|%s", e.Event, e.GetUUID()))
}
// ParseKey for an event
func ParseKey(key []byte) (Type, string) {
parts := strings.Split(string(key), "|")
return Type(parts[0]), parts[1] // event type | uuid
}
// SetUUID of the event
func (e *baseEvent) SetUUID() {
func (e *Base) SetUUID() {
if e.UUID == "" {
e.UUID = uuid.New().String()
}
}
// GetUUID of the event
func (e *baseEvent) GetUUID() string {
func (e *Base) GetUUID() string {
return e.UUID
}
// GetTime of the event
func (e *baseEvent) GetTime() time.Time {
func (e *Base) GetTime() time.Time {
return time.Unix(int64(e.Timestamp), 0)
}
// String convert an event to a string
func (e *baseEvent) String() string {
return fmt.Sprintf("uuid=%s timestamp=%d id=%d event=%s ", e.UUID, e.Timestamp, e.MailjetMessageID, e.Event)
// GetType of the event
func (e *Base) GetType() string {
return e.Event
}
// Point return all tags and fields for influxdb
func (e *baseEvent) Point() (map[string]string, map[string]interface{}) {
tags := make(map[string]string)
tags["mj_message_id"] = fmt.Sprintf("%d", e.MailjetMessageID)
tags["mj_campaign_id"] = fmt.Sprintf("%d", e.MailjetCampaignID)
tags["mj_contact_id"] = fmt.Sprintf("%d", e.MailjetContactID)
fields := make(map[string]interface{})
fields["email"] = e.Email
return tags, fields
// String convert an event to a string
func (e *Base) String() string {
return fmt.Sprintf("uuid=%s timestamp=%d id=%d event=%s ", e.UUID, e.Timestamp, e.MailjetMessageID, e.Event)
}
// Bytes convert the event to a json body
func Bytes(e interface{}) []byte {
buf := bytes.NewBuffer(nil)
encoder := json.NewEncoder(buf)
if err := encoder.Encode(e); err != nil {
log.Fatalln(err)
}
return buf.Bytes()
func (e *Base) Bytes() []byte {
return Bytes(e)
}
package event
package events
import (
"bytes"
"fmt"
"log"
"os"
"strings"
"testing"
)
func TestBaseEvent(t *testing.T) {
func TestBase(t *testing.T) {
file, err := os.Open("fixtures/click.json")
if err != nil {
log.Fatalln(err)
}
event, err := Parse(ClickEventType, file)
event, err := Parse(ClickType, file)
if err != nil {
t.Error(err)
}
......@@ -28,27 +26,3 @@ func TestBaseEvent(t *testing.T) {
t.Fail()
}
}
func testEventParsing(t *testing.T, eventType Type) Event {
file, err := os.Open(fmt.Sprintf("fixtures/%s.json", string(eventType)))
if err != nil {
log.Fatalln(err)
}
defer file.Close()
event, err := Parse(eventType, file)
if err != nil {
t.Error(err)
}
event2, err := Parse(eventType, bytes.NewReader(event.Bytes()))
if err != nil {
t.Error(err)
}
if string(event.Bytes()) != string(event2.Bytes()) {
t.Errorf("event bytes mismatch after 2 parsing")
}
return event
}
package event
package events
// BlockedEvent define a blocked event occurred on mailjet side
type BlockedEvent struct {
baseEvent
// Blocked define a blocked event occurred on mailjet side
type Blocked struct {
Base
// See https://dev.mailjet.com/email/guides/webhooks/#possible-values-for-errors
ErrorRelatedTo string `json:"error_related_to"`
Error string `json:"error"`
}
// Point return all tags and fields for influxdb
func (e *BlockedEvent) Point() (