...
 
Commits (5)
package app
import (
"log"
"github.com/gorilla/mux"
influxdb "github.com/influxdata/influxdb-client-go"
mailjet "github.com/mailjet/mailjet-apiv3-go"
cron "github.com/robfig/cron/v3"
bolt "go.etcd.io/bbolt"
)
......@@ -14,8 +13,9 @@ type App struct {
config *Config
store *bolt.DB
router *mux.Router
cron *cron.Cron
cursorPosition string
sending bool
mailjetClient *mailjet.Client
influxdbClient influxdb.Client
......@@ -30,9 +30,16 @@ func NewApp(config *Config) *App {
a.mailjetClient = NewMailjetClient(a)
a.influxdbClient = NewInfluxdbClient(a)
if err := a.LoadCursorPosition(); err != nil {
log.Fatalln(err)
}
return a
}
// NewServer creates a server from a new app
func NewServer(config *Config) *App {
a := NewApp(config)
a.router = NewRouter(a)
a.cron = NewCron(a)
a.cron.Start()
return a
}
......
......@@ -9,7 +9,9 @@ import (
// Config defines the application configuration
type Config struct {
ListenAddr string `default:"127.0.0.1:7000" yaml:"listen_addr"`
ListenAddr string `default:"127.0.0.1:7000" yaml:"listen_addr"`
Verbose bool `default:"false" yaml:"verbose"`
SendInterval string `default:"1h" yaml:"send_interval"`
PublicURL string `required:"true" yaml:"public_url"`
publicURL *url.URL
......@@ -22,8 +24,10 @@ type Config struct {
MailjetUser string `required:"true" yaml:"mailjet_user"`
MailjetPass string `required:"true" yaml:"mailjet_pass"`
InfluxdbAddr string `required:"true" yaml:"influxdb_addr"`
InfluxdbPass string `required:"true" yaml:"influxdb_pass"`
InfluxdbAddr string `required:"true" yaml:"influxdb_addr"`
InfluxdbToken string `required:"true" yaml:"influxdb_token"`
InfluxdbOrg string `required:"true" yaml:"influxdb_org"`
InfluxdbBucket string `required:"true" yaml:"influxdb_bucket"`
}
// NewConfig return a new configuration from path
......
package app
import (
"io/ioutil"
"log"
)
func newTestConfig() *Config {
tmpfile, err := ioutil.TempFile("", "jetstats.*.db")
if err != nil {
log.Fatalln(err)
}
return &Config{
PublicURL: "http://127.0.0.1:7000",
Database: tmpfile.Name(),
WebhookUser: "user",
WebhookPass: "pass",
MailjetUser: "user",
MailjetPass: "pass",
InfluxdbAddr: "http://127.0.0.1:9999",
InfluxdbToken: "pass",
}
}
package app
import (
"fmt"
"log"
cron "github.com/robfig/cron/v3"
)
// NewCron create a new cron component
func NewCron(a *App) *cron.Cron {
c := cron.New()
log.Printf("cron will send points every %s\n", a.config.SendInterval)
if _, err := c.AddFunc(fmt.Sprintf("@every %s", a.config.SendInterval), func() {
if !a.sending {
a.sending = true
if err := a.Send(); err != nil {
log.Println(err)
}
a.sending = false
}
}); err != nil {
log.Fatalln(err)
}
return c
}
......@@ -12,8 +12,8 @@ type BlockedEvent struct {
// Point return all tags and fields for influxdb
func (e *BlockedEvent) Point() (map[string]string, map[string]interface{}) {
tags, fields := e.baseEvent.Point()
fields["error_related_to"] = e.ErrorRelatedTo
fields["error"] = e.Error
tags["error_related_to"] = e.ErrorRelatedTo
tags["error"] = e.Error
return tags, fields
}
......
......@@ -13,15 +13,14 @@ func TestBlockedEvent(t *testing.T) {
out interface{}
}{
{tags["type"], "blocked"},
{fields["id"], uint64(0)},
{fields["email"], "blocked@mailjet.com"},
{tags["email"], "blocked@mailjet.com"},
{fields["mj_message_id"], uint64(13792286917004336)},
{fields["mj_campaign_id"], uint64(380)},
{fields["mj_contact_id"], uint64(19092)},
{fields["mj_campaign_payload"], ""},
{fields["error_related_to"], "recipient"},
{fields["error"], "user unknown"},
{tags["error_related_to"], "recipient"},
{tags["error"], "user unknown"},
} {
if test.in != test.out {
t.Errorf("%s != %s", test.in, test.out)
......
......@@ -16,10 +16,10 @@ type BounceEvent struct {
// Point return all tags and fields for influxdb
func (e *BounceEvent) Point() (map[string]string, map[string]interface{}) {
tags, fields := e.baseEvent.Point()
tags["error_related_to"] = e.ErrorRelatedTo
tags["error"] = e.Error
fields["blocked"] = e.Blocked
fields["hard_bounce"] = e.HardBounce
fields["error_related_to"] = e.ErrorRelatedTo
fields["error"] = e.Error
fields["comment"] = e.Comment
return tags, fields
......
......@@ -13,8 +13,7 @@ func TestBounceEvent(t *testing.T) {
out interface{}
}{
{tags["type"], "bounce"},
{fields["id"], uint64(0)},
{fields["email"], "bounce@mailjet.com"},
{tags["email"], "bounce@mailjet.com"},
{fields["mj_message_id"], uint64(13792286917004336)},
{fields["mj_campaign_id"], uint64(2)},
{fields["mj_contact_id"], uint64(199)},
......@@ -22,8 +21,8 @@ func TestBounceEvent(t *testing.T) {
{fields["blocked"], false},
{fields["hard_bounce"], true},
{fields["error_related_to"], "recipient"},
{fields["error"], "user unknown"},
{tags["error_related_to"], "recipient"},
{tags["error"], "user unknown"},
{fields["comment"], "Host or domain name not found: Host not found"},
} {
if test.in != test.out {
......
......@@ -9,7 +9,7 @@ type ClickEvent struct {
// Point return all tags and fields for influxdb
func (e *ClickEvent) Point() (map[string]string, map[string]interface{}) {
tags, fields := e.OpenEvent.Point()
fields["url"] = e.URL
tags["url"] = e.URL
return tags, fields
}
......
......@@ -13,18 +13,17 @@ func TestClickEvent(t *testing.T) {
out interface{}
}{
{tags["type"], "click"},
{fields["id"], uint64(0)},
{fields["email"], "api@mailjet.com"},
{tags["email"], "api@mailjet.com"},
{fields["mj_message_id"], uint64(19421777836302490)},
{fields["mj_campaign_id"], uint64(7272)},
{fields["mj_contact_id"], uint64(4)},
{fields["mj_campaign_payload"], ""},
{fields["ip"], "127.0.0.1"},
{fields["geo"], "FR"},
{fields["user_agent"], "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_0) AppleWebKit/537.36"},
{tags["geo"], "FR"},
{tags["user_agent"], "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_0) AppleWebKit/537.36"},
{fields["url"], "https://mailjet.com"},
{tags["url"], "https://mailjet.com"},
} {
if test.in != test.out {
t.Errorf("%s != %s", test.in, test.out)
......
......@@ -6,7 +6,10 @@ import (
"fmt"
"io"
"log"
"strings"
"time"
"github.com/google/uuid"
)
// Type event
......@@ -61,21 +64,26 @@ func Parse(t Type, data io.Reader) (Event, error) {
return nil, err
}
event.SetUUID()
return event, nil
}
// Event interface for all events
type Event interface {
SetID(uint64)
SetUUID()
GetUUID() string
GetTime() time.Time
Key() []byte
Bytes() []byte
String() string
Point() (map[string]string, map[string]interface{})
}
type baseEvent struct {
ID uint64 `json:"id"`
UUID string `json:"uuid"`
Event string `json:"event"` // the event type
Timestamp uint64 `json:"time"` // Unix timestamp of event
Email string `json:"email"` // email address of recipient triggering the event
......@@ -90,9 +98,27 @@ type baseEvent struct {
CustomID string `json:"CustomID"` // the event payload, when provided at send time
}
// SetID for the event
func (e *baseEvent) SetID(value uint64) {
e.ID = value
// Key of the event
func (e *baseEvent) Key() []byte {
return []byte(fmt.Sprintf("%s|%s", e.Event, e.GetUUID()))
}
// ParseKey for an event
func ParseKey(key []byte) (Type, string) {
parts := strings.Split(string(key), "|")
return Type(parts[0]), parts[1] // event type | uuid
}
// SetUUID of the event
func (e *baseEvent) SetUUID() {
if e.UUID == "" {
e.UUID = uuid.New().String()
}
}
// GetUUID of the event
func (e *baseEvent) GetUUID() string {
return e.UUID
}
// GetTime of the event
......@@ -102,17 +128,17 @@ func (e *baseEvent) GetTime() time.Time {
// String convert an event to a string
func (e *baseEvent) String() string {
return fmt.Sprintf("id=%d event=%s timestamp=%d", e.ID, e.Event, e.Timestamp)
return fmt.Sprintf("uuid=%s event=%s timestamp=%d", e.UUID, e.Event, e.Timestamp)
}
// Point return all tags and fields for influxdb
func (e *baseEvent) Point() (map[string]string, map[string]interface{}) {
tags := make(map[string]string)
tags["type"] = e.Event
fields := make(map[string]interface{})
fields["id"] = e.ID
fields["email"] = e.Email
fields["uuid"] = e.UUID
tags["type"] = e.Event
tags["email"] = e.Email
fields["mj_message_id"] = e.MailjetMessageID
fields["mj_campaign_id"] = e.MailjetCampaignID
fields["mj_contact_id"] = e.MailjetContactID
......
......@@ -5,6 +5,7 @@ import (
"fmt"
"log"
"os"
"strings"
"testing"
)
......@@ -19,13 +20,11 @@ func TestBaseEvent(t *testing.T) {
t.Error(err)
}
event.SetID(3)
if event.GetTime().Unix() != 1433334653 {
t.Fail()
}
if event.String() != "id=3 event=click timestamp=1433334653" {
if !strings.Contains(event.String(), " event=click timestamp=1433334653") {
t.Fail()
}
}
......
......@@ -13,8 +13,8 @@ type OpenEvent struct {
func (e *OpenEvent) Point() (map[string]string, map[string]interface{}) {
tags, fields := e.baseEvent.Point()
fields["ip"] = e.IP
fields["geo"] = e.Geo
fields["user_agent"] = e.UserAgent
tags["geo"] = e.Geo
tags["user_agent"] = e.UserAgent
return tags, fields
}
......
......@@ -13,16 +13,15 @@ func TestOpenEvent(t *testing.T) {
out interface{}
}{
{tags["type"], "open"},
{fields["id"], uint64(0)},
{fields["email"], "api@mailjet.com"},
{tags["email"], "api@mailjet.com"},
{fields["mj_message_id"], uint64(19421777396190490)},
{fields["mj_campaign_id"], uint64(7173)},
{fields["mj_contact_id"], uint64(320)},
{fields["mj_campaign_payload"], ""},
{fields["ip"], "127.0.0.1"},
{fields["geo"], "US"},
{fields["user_agent"], "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko Firefox/11.0"},
{tags["geo"], "US"},
{tags["user_agent"], "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko Firefox/11.0"},
} {
if test.in != test.out {
t.Errorf("%s != %s", test.in, test.out)
......
......@@ -13,8 +13,7 @@ func TestSentEvent(t *testing.T) {
out interface{}
}{
{tags["type"], "sent"},
{fields["id"], uint64(0)},
{fields["email"], "api@mailjet.com"},
{tags["email"], "api@mailjet.com"},
{fields["mj_message_id"], uint64(19421777835146490)},
{fields["mj_campaign_id"], uint64(7257)},
{fields["mj_contact_id"], uint64(4)},
......
......@@ -10,7 +10,7 @@ type SpamEvent struct {
// Point return all tags and fields for influxdb
func (e *SpamEvent) Point() (map[string]string, map[string]interface{}) {
tags, fields := e.baseEvent.Point()
fields["source"] = e.Source
tags["source"] = e.Source
return tags, fields
}
......
......@@ -13,14 +13,13 @@ func TestSpamEvent(t *testing.T) {
out interface{}
}{
{tags["type"], "spam"},
{fields["id"], uint64(0)},
{fields["email"], "bounce@mailjet.com"},
{tags["email"], "bounce@mailjet.com"},
{fields["mj_message_id"], uint64(13792286917004336)},
{fields["mj_campaign_id"], uint64(1898)},
{fields["mj_contact_id"], uint64(10920)},
{fields["mj_campaign_payload"], ""},
{fields["source"], "JMRPP"},
{tags["source"], "JMRPP"},
} {
if test.in != test.out {
t.Errorf("%s != %s", test.in, test.out)
......
......@@ -13,16 +13,15 @@ func TestUnsubEvent(t *testing.T) {
out interface{}
}{
{tags["type"], "unsub"},
{fields["id"], uint64(0)},
{fields["email"], "api@mailjet.com"},
{tags["email"], "api@mailjet.com"},
{fields["mj_message_id"], uint64(20547674933128000)},
{fields["mj_campaign_id"], uint64(7276)},
{fields["mj_contact_id"], uint64(126)},
{fields["mj_campaign_payload"], ""},
{fields["ip"], "127.0.0.1"},
{fields["geo"], "FR"},
{fields["user_agent"], "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_0)"},
{tags["geo"], "FR"},
{tags["user_agent"], "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_0)"},
{fields["mj_list_id"], uint64(1)},
} {
......
......@@ -6,5 +6,5 @@ import (
// NewInfluxdbClient create a new influx db client
func NewInfluxdbClient(a *App) influxdb.Client {
return influxdb.NewClient(a.config.InfluxdbAddr, a.config.InfluxdbPass)
return influxdb.NewClient(a.config.InfluxdbAddr, a.config.InfluxdbToken)
}
......@@ -4,15 +4,16 @@ import (
"bytes"
"context"
"errors"
"strings"
"log"
"time"
"git.oolsa.net/jooola/jetstats/app/event"
influxdb "github.com/influxdata/influxdb-client-go"
"go.etcd.io/bbolt"
bolt "go.etcd.io/bbolt"
)
// Send data to influxdb
// Send saved data to influxdb
func (a *App) Send() error {
ready, err := a.influxdbClient.Ready(context.Background())
if err != nil {
......@@ -23,28 +24,42 @@ func (a *App) Send() error {
return errors.New("influxdb server not ready")
}
err = a.store.View(func(tx *bbolt.Tx) error {
if a.config.Verbose {
log.Println("reseting bucket")
}
d := a.influxdbClient.DeleteApi()
if err := d.DeleteWithName(
context.Background(),
a.config.InfluxdbOrg,
a.config.InfluxdbBucket,
time.Time{},
time.Now(),
"",
); err != nil {
return err
}
err = a.store.View(func(tx *bolt.Tx) error {
eventsBucket := tx.Bucket(eventsBucketKey)
c := eventsBucket.Cursor()
var ik, iv []byte
if a.cursorPosition != "" {
ik, iv = c.Seek([]byte(a.cursorPosition))
} else {
ik, iv = c.First()
}
w := a.influxdbClient.WriteApi("oolsa", "jetstats")
for k, v := ik, iv; k != nil; k, v = c.Next() {
eventType := strings.Split(string(k), "|")[0]
ev, err := event.Parse(event.Type(eventType), bytes.NewReader(v))
w := a.influxdbClient.WriteApi(a.config.InfluxdbOrg, a.config.InfluxdbBucket)
for k, v := c.First(); k != nil; k, v = c.Next() {
evt, _ := event.ParseKey(k)
ev, err := event.Parse(evt, bytes.NewReader(v))
if err != nil {
return err
}
tags, fields := ev.Point()
p := influxdb.NewPoint("event", tags, fields, ev.GetTime())
p := influxdb.NewPoint(string(evt), tags, fields, ev.GetTime())
if a.config.Verbose {
log.Printf("writing point: %s", ev.String())
}
w.WritePoint(p)
}
......
......@@ -14,8 +14,6 @@ const Timeout = 15
// Serve create routes and start listening for calls
func (a *App) Serve() {
a.router = NewRouter(a)
srv := &http.Server{
Addr: a.config.ListenAddr,
WriteTimeout: time.Second * Timeout,
......
package app
import (
"fmt"
"log"
"git.oolsa.net/jooola/jetstats/app/event"
"go.etcd.io/bbolt"
bolt "go.etcd.io/bbolt"
)
var (
eventsBucketKey = []byte("events")
cursorBucketKey = []byte("cursor")
)
func bytesToString(v []byte) string {
if v == nil {
return ""
}
return string(v)
}
// NewStore creates a store
func NewStore(a *App) *bbolt.DB {
db, err := bbolt.Open(a.config.Database, 0600, nil)
func NewStore(a *App) *bolt.DB {
db, err := bolt.Open(a.config.Database, 0600, nil)
if err != nil {
log.Fatalln(err)
}
if err := db.Update(func(tx *bbolt.Tx) error {
if err := db.Update(func(tx *bolt.Tx) error {
if _, err := tx.CreateBucketIfNotExists(eventsBucketKey); err != nil {
return err
}
......@@ -42,43 +32,19 @@ func NewStore(a *App) *bbolt.DB {
// SaveEvent store the event and add it to the queue
func (a *App) SaveEvent(evt event.Type, ev event.Event) error {
return a.store.Update(func(tx *bbolt.Tx) error {
return a.store.Update(func(tx *bolt.Tx) error {
eventsBucket := tx.Bucket(eventsBucketKey)
id, _ := eventsBucket.NextSequence()
ev.SetID(id)
key := fmt.Sprintf("%s|%d", evt, id)
if err := eventsBucket.Put([]byte(key), ev.Bytes()); err != nil {
if err := eventsBucket.Put(ev.Key(), ev.Bytes()); err != nil {
return err
}
go func(ev event.Event) {
log.Println(ev.String())
}(ev)
return nil
})
}
func (a *App) SaveCursorPosition(id string) error {
return a.store.Update(func(tx *bbolt.Tx) error {
cursorBucket := tx.Bucket(cursorBucketKey)
if err := cursorBucket.Put(cursorBucketKey, []byte(id)); err != nil {
return err
if a.config.Verbose {
go func(ev event.Event) {
log.Printf("saving event: %s", ev.String())
}(ev)
}
a.cursorPosition = id
return nil
})
}
func (a *App) LoadCursorPosition() error {
return a.store.View(func(tx *bbolt.Tx) error {
cursorBucket := tx.Bucket(cursorBucketKey)
a.cursorPosition = bytesToString(cursorBucket.Get(cursorBucketKey))
return nil
})
}
package app
import (
"bytes"
"log"
"os"
"testing"
"git.oolsa.net/jooola/jetstats/app/event"
bolt "go.etcd.io/bbolt"
)
func TestStore(t *testing.T) {
a := NewApp(newTestConfig())
file, err := os.Open("event/fixtures/click.json")
if err != nil {
log.Fatalln(err)
}
ev, err := event.Parse(event.ClickEventType, file)
if err != nil {
t.Error(err)
}
if err := a.SaveEvent(event.ClickEventType, ev); err != nil {
t.Error(err)
}
err = a.store.View(func(tx *bolt.Tx) error {
eventsBucket := tx.Bucket(eventsBucketKey)
data := eventsBucket.Get(ev.Key())
if data == nil {
t.Fail()
}
ev2, err := event.Parse(event.ClickEventType, bytes.NewReader(data))
if err != nil {
t.Error(err)
}
if string(ev.Bytes()) != string(ev2.Bytes()) {
t.Fail()
}
return nil
})
if err != nil {
t.Error(err)
}
}
......@@ -16,6 +16,7 @@ func init() {
rootCmd.PersistentFlags().StringVarP(&configPath, "config", "c", "jetstats.yml", "configuration file")
rootCmd.AddCommand(serverCmd)
rootCmd.AddCommand(webhooksCmd)
rootCmd.AddCommand(sendCmd)
}
// Execute root command
......
package cmd
import (
"log"
"git.oolsa.net/jooola/jetstats/app"
"github.com/spf13/cobra"
)
var sendCmd = &cobra.Command{
Use: "send",
Run: func(cmd *cobra.Command, args []string) {
a := app.NewApp(app.NewConfig(configPath))
if err := a.Send(); err != nil {
log.Println(err)
}
defer a.Close()
},
}
......@@ -9,7 +9,7 @@ import (
var serverCmd = &cobra.Command{
Use: "server",
Run: func(cmd *cobra.Command, args []string) {
a := app.NewApp(app.NewConfig(configPath))
a := app.NewServer(app.NewConfig(configPath))
a.Serve()
......