Commit 55a7e015 authored by jonas's avatar jonas

Add influxdb sender

parent 797cdc12
Pipeline #723 passed with stage
in 52 seconds
package app
import (
"log"
"github.com/gorilla/mux"
influxdb "github.com/influxdata/influxdb-client-go"
mailjet "github.com/mailjet/mailjet-apiv3-go"
"go.etcd.io/bbolt"
bolt "go.etcd.io/bbolt"
)
// App is a mailstats application
type App struct {
config *Config
store *bbolt.DB
store *bolt.DB
router *mux.Router
mailjetClient *mailjet.Client
cursorPosition string
mailjetClient *mailjet.Client
influxdbClient influxdb.Client
}
// NewApp creates a new app and initiate its components
......@@ -22,6 +28,11 @@ func NewApp(config *Config) *App {
a.config = config
a.store = NewStore(a)
a.mailjetClient = NewMailjetClient(a)
a.influxdbClient = NewInfluxdbClient(a)
if err := a.LoadCursorPosition(); err != nil {
log.Fatalln(err)
}
return a
}
......@@ -29,4 +40,5 @@ func NewApp(config *Config) *App {
// Close closes all application components
func (a *App) Close() {
a.store.Close()
a.influxdbClient.Close()
}
......@@ -21,6 +21,9 @@ type Config struct {
MailjetUser string `required:"true" yaml:"mailjet_user"`
MailjetPass string `required:"true" yaml:"mailjet_pass"`
InfluxdbAddr string `required:"true" yaml:"influxdb_addr"`
InfluxdbPass string `required:"true" yaml:"influxdb_pass"`
}
// NewConfig return a new configuration from path
......
package event
// BlockedEvent define a blocked event occurred on mailjet side
type BlockedEvent struct {
baseEvent
// See https://dev.mailjet.com/email/guides/webhooks/#possible-values-for-errors
ErrorRelatedTo string `json:"error_related_to"`
Error string `json:"error"`
}
// Point return all tags and fields for influxdb
func (e *BlockedEvent) Point() (map[string]string, map[string]interface{}) {
tags, fields := e.baseEvent.Point()
fields["error_related_to"] = e.ErrorRelatedTo
fields["error"] = e.Error
return tags, fields
}
// Bytes convert the event to a json body
func (e *BlockedEvent) Bytes() []byte {
return Bytes(e)
}
package event
import (
"testing"
)
func TestBlockedEvent(t *testing.T) {
event := testEventParsing(t, BlockedEventType)
tags, fields := event.Point()
for _, test := range []struct {
in interface{}
out interface{}
}{
{tags["type"], "blocked"},
{fields["id"], uint64(0)},
{fields["email"], "blocked@mailjet.com"},
{fields["mj_message_id"], uint64(13792286917004336)},
{fields["mj_campaign_id"], uint64(380)},
{fields["mj_contact_id"], uint64(19092)},
{fields["mj_campaign_payload"], ""},
{fields["error_related_to"], "recipient"},
{fields["error"], "user unknown"},
} {
if test.in != test.out {
t.Errorf("%s != %s", test.in, test.out)
}
}
}
package event
// BounceEvent define a bounce event occurred on mailjet side
type BounceEvent struct {
baseEvent
Blocked bool `json:"blocked"` // true if this bounce leads to the recipient being blocked
HardBounce bool `json:"hard_bounce"` // true if error was permanent
// See https://dev.mailjet.com/email/guides/webhooks/#possible-values-for-errors
ErrorRelatedTo string `json:"error_related_to"`
Error string `json:"error"`
Comment string `json:"comment"` // the raw SMTP error code, including descriptions of the reason for the bounce
}
// Point return all tags and fields for influxdb
func (e *BounceEvent) Point() (map[string]string, map[string]interface{}) {
tags, fields := e.baseEvent.Point()
fields["blocked"] = e.Blocked
fields["hard_bounce"] = e.HardBounce
fields["error_related_to"] = e.ErrorRelatedTo
fields["error"] = e.Error
fields["comment"] = e.Comment
return tags, fields
}
// Bytes convert the event to a json body
func (e *BounceEvent) Bytes() []byte {
return Bytes(e)
}
package event
import (
"testing"
)
func TestBounceEvent(t *testing.T) {
event := testEventParsing(t, BounceEventType)
tags, fields := event.Point()
for _, test := range []struct {
in interface{}
out interface{}
}{
{tags["type"], "bounce"},
{fields["id"], uint64(0)},
{fields["email"], "bounce@mailjet.com"},
{fields["mj_message_id"], uint64(13792286917004336)},
{fields["mj_campaign_id"], uint64(2)},
{fields["mj_contact_id"], uint64(199)},
{fields["mj_campaign_payload"], ""},
{fields["blocked"], false},
{fields["hard_bounce"], true},
{fields["error_related_to"], "recipient"},
{fields["error"], "user unknown"},
{fields["comment"], "Host or domain name not found: Host not found"},
} {
if test.in != test.out {
t.Errorf("%s != %s", test.in, test.out)
}
}
}
package event
// ClickEvent define a click event occurred on mailjet side
type ClickEvent struct {
OpenEvent
URL string `json:"url"` // the link that was clicked
}
// Point return all tags and fields for influxdb
func (e *ClickEvent) Point() (map[string]string, map[string]interface{}) {
tags, fields := e.OpenEvent.Point()
fields["url"] = e.URL
return tags, fields
}
// Bytes convert the event to a json body
func (e *ClickEvent) Bytes() []byte {
return Bytes(e)
}
package event
import (
"testing"
)
func TestClickEvent(t *testing.T) {
event := testEventParsing(t, ClickEventType)
tags, fields := event.Point()
for _, test := range []struct {
in interface{}
out interface{}
}{
{tags["type"], "click"},
{fields["id"], uint64(0)},
{fields["email"], "api@mailjet.com"},
{fields["mj_message_id"], uint64(19421777836302490)},
{fields["mj_campaign_id"], uint64(7272)},
{fields["mj_contact_id"], uint64(4)},
{fields["mj_campaign_payload"], ""},
{fields["ip"], "127.0.0.1"},
{fields["geo"], "FR"},
{fields["user_agent"], "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_0) AppleWebKit/537.36"},
{fields["url"], "https://mailjet.com"},
} {
if test.in != test.out {
t.Errorf("%s != %s", test.in, test.out)
}
}
}
package app
package event
import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"time"
)
// EventType type
type EventType string
// Type event
type Type string
// Event type enum
const (
OpenEventType EventType = "open"
ClickEventType EventType = "click"
BounceEventType EventType = "bounce"
SpamEventType EventType = "spam"
BlockedEventType EventType = "blocked"
UnsubEventType EventType = "unsub"
SentEventType EventType = "sent"
OpenEventType Type = "open"
ClickEventType Type = "click"
BounceEventType Type = "bounce"
SpamEventType Type = "spam"
BlockedEventType Type = "blocked"
UnsubEventType Type = "unsub"
SentEventType Type = "sent"
)
// EventTypes defines all events
var EventTypes = []EventType{
// Types defines all events types
var Types = []Type{
SentEventType,
OpenEventType,
ClickEventType,
......@@ -33,16 +34,47 @@ var EventTypes = []EventType{
SpamEventType,
}
func Parse(t Type, data io.Reader) (Event, error) {
var event Event
switch t {
case OpenEventType:
event = &OpenEvent{}
case ClickEventType:
event = &ClickEvent{}
case BounceEventType:
event = &BounceEvent{}
case SpamEventType:
event = &SpamEvent{}
case BlockedEventType:
event = &BlockedEvent{}
case UnsubEventType:
event = &UnsubEvent{}
case SentEventType:
event = &SentEvent{}
}
decoder := json.NewDecoder(data)
err := decoder.Decode(&event)
if err != nil {
return nil, err
}
return event, nil
}
// Event interface for all events
type Event interface {
Decode(r *http.Request) error
Encode() []byte
SetID(uint64)
GetTime() time.Time
Bytes() []byte
String() string
Point() (map[string]string, map[string]interface{})
}
// BaseEvent define an event occurred on mailjet side
type BaseEvent struct {
type baseEvent struct {
ID uint64 `json:"id"`
Event string `json:"event"` // the event type
Timestamp uint64 `json:"time"` // Unix timestamp of event
......@@ -58,82 +90,45 @@ type BaseEvent struct {
CustomID string `json:"CustomID"` // the event payload, when provided at send time
}
// Decode mailjet event json body
func (e *BaseEvent) Decode(r *http.Request) error {
decoder := json.NewDecoder(r.Body)
return decoder.Decode(&e)
// SetID for the event
func (e *baseEvent) SetID(value uint64) {
e.ID = value
}
// Encode mailjet event json body
func (e *BaseEvent) Encode() []byte {
buf := bytes.NewBuffer(nil)
encoder := json.NewEncoder(buf)
if err := encoder.Encode(e); err != nil {
log.Fatalln(err)
}
return buf.Bytes()
// GetTime of the event
func (e *baseEvent) GetTime() time.Time {
return time.Unix(int64(e.Timestamp), 0)
}
// String convert an event to a string
func (e *BaseEvent) String() string {
return fmt.Sprintf("id=%d type=%s timestamp=%d", e.ID, e.Event, e.Timestamp)
func (e *baseEvent) String() string {
return fmt.Sprintf("id=%d event=%s timestamp=%d", e.ID, e.Event, e.Timestamp)
}
// SetID set the internal event ID
func (e *BaseEvent) SetID(value uint64) {
e.ID = value
}
// SentEvent define a sent event occurred on mailjet side
type SentEvent struct {
BaseEvent
SMTPReply string `json:"smtp_reply"` // The raw SMTP response message
}
// Point return all tags and fields for influxdb
func (e *baseEvent) Point() (map[string]string, map[string]interface{}) {
tags := make(map[string]string)
tags["type"] = e.Event
// OpenEvent define an open event occurred on mailjet side
type OpenEvent struct {
BaseEvent
IP string `json:"ip"` // IP address (can be IPv4 or IPv6) that triggered the event
Geo string `json:"geo"` // country code of IP address
UserAgent string `json:"agent"` // User-Agent
}
// ClickEvent define a click event occurred on mailjet side
type ClickEvent struct {
OpenEvent
URL string `json:"url"` // the link that was clicked
}
fields := make(map[string]interface{})
fields["id"] = e.ID
fields["email"] = e.Email
fields["mj_message_id"] = e.MailjetMessageID
fields["mj_campaign_id"] = e.MailjetCampaignID
fields["mj_contact_id"] = e.MailjetContactID
fields["mj_campaign_payload"] = e.CustomCampaign
// UnsubEvent define a unsub event occurred on mailjet side
type UnsubEvent struct {
OpenEvent
MailjetListID string `json:"mj_list_id"` // internal Mailjet list ID associated to the contact
return tags, fields
}
// BounceEvent define a bounce event occurred on mailjet side
type BounceEvent struct {
BaseEvent
Blocked bool `json:"blocked"` // true if this bounce leads to the recipient being blocked
HardBounce bool `json:"hard_bounce"` // true if error was permanent
// See https://dev.mailjet.com/email/guides/webhooks/#possible-values-for-errors
ErrorRelatedTo string `json:"error_related_to"`
Error string `json:"error"`
Comment string `json:"comment"` // the raw SMTP error code, including descriptions of the reason for the bounce
}
// Bytes convert the event to a json body
func Bytes(e interface{}) []byte {
buf := bytes.NewBuffer(nil)
encoder := json.NewEncoder(buf)
// BlockedEvent define a blocked event occurred on mailjet side
type BlockedEvent struct {
BaseEvent
// See https://dev.mailjet.com/email/guides/webhooks/#possible-values-for-errors
ErrorRelatedTo string `json:"error_related_to"`
Error string `json:"error"`
}
if err := encoder.Encode(e); err != nil {
log.Fatalln(err)
}
// SpamEvent define a spam event occurred on mailjet side
type SpamEvent struct {
BaseEvent
Source string `json:"source"` // indicates which feedback loop program reported this complaint
return buf.Bytes()
}
package event
import (
"bytes"
"fmt"
"log"
"os"
"testing"
)
func TestBaseEvent(t *testing.T) {
file, err := os.Open("fixtures/click.json")
if err != nil {
log.Fatalln(err)
}
event, err := Parse(ClickEventType, file)
if err != nil {
t.Error(err)
}
event.SetID(3)
if event.GetTime().Unix() != 1433334653 {
t.Fail()
}
if event.String() != "id=3 event=click timestamp=1433334653" {
t.Fail()
}
}
func testEventParsing(t *testing.T, eventType Type) Event {
file, err := os.Open(fmt.Sprintf("fixtures/%s.json", string(eventType)))
if err != nil {
log.Fatalln(err)
}
defer file.Close()
event, err := Parse(eventType, file)
if err != nil {
t.Error(err)
}
event2, err := Parse(eventType, bytes.NewReader(event.Bytes()))
if err != nil {
t.Error(err)
}
if string(event.Bytes()) != string(event2.Bytes()) {
t.Errorf("event bytes mismatch after 2 parsing")
}
return event
}
{
"event": "blocked",
"time": 1430812195,
"MessageID": 13792286917004336,
"Message_GUID": "1ab23cd4-e567-8901-2345-6789f0gh1i2j",
"email": "blocked@mailjet.com",
"mj_campaign_id": 380,
"mj_contact_id": 19092,
"customcampaign": "",
"CustomID": "helloworld",
"Payload": "",
"error_related_to": "recipient",
"error": "user unknown"
}
{
"event": "bounce",
"time": 1430812195,
"MessageID": 13792286917004336,
"Message_GUID": "1ab23cd4-e567-8901-2345-6789f0gh1i2j",
"email": "bounce@mailjet.com",
"mj_campaign_id": 2,
"mj_contact_id": 199,
"customcampaign": "",
"CustomID": "helloworld",
"Payload": "",
"blocked": false,
"hard_bounce": true,
"error_related_to": "recipient",
"error": "user unknown",
"comment": "Host or domain name not found: Host not found"
}
{
"event": "click",
"time": 1433334653,
"MessageID": 19421777836302490,
"Message_GUID": "1ab23cd4-e567-8901-2345-6789f0gh1i2j",
"email": "api@mailjet.com",
"mj_campaign_id": 7272,
"mj_contact_id": 4,
"customcampaign": "",
"CustomID": "helloworld",
"Payload": "",
"url": "https://mailjet.com",
"ip": "127.0.0.1",
"geo": "FR",
"agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_0) AppleWebKit/537.36"
}
{
"event": "open",
"time": 1433103519,
"MessageID": 19421777396190490,
"Message_GUID": "1ab23cd4-e567-8901-2345-6789f0gh1i2j",
"email": "api@mailjet.com",
"mj_campaign_id": 7173,
"mj_contact_id": 320,
"customcampaign": "",
"CustomID": "helloworld",
"Payload": "",
"ip": "127.0.0.1",
"geo": "US",
"agent": "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko Firefox/11.0"
}
{
"event": "sent",
"time": 1433333949,
"MessageID": 19421777835146490,
"Message_GUID": "1ab23cd4-e567-8901-2345-6789f0gh1i2j",
"email": "api@mailjet.com",
"mj_campaign_id": 7257,
"mj_contact_id": 4,
"customcampaign": "",
"mj_message_id": "19421777835146490",
"smtp_reply": "sent (250 2.0.0 OK 1433333948 fa5si855896wjc.199 - gsmtp)",
"CustomID": "helloworld",
"Payload": ""
}
{
"event": "spam",
"time": 1430812195,
"MessageID": 13792286917004336,
"Message_GUID": "1ab23cd4-e567-8901-2345-6789f0gh1i2j",
"email": "bounce@mailjet.com",
"mj_campaign_id": 1898,
"mj_contact_id": 10920,
"customcampaign": "",
"CustomID": "helloworld",
"Payload": "",
"source": "JMRPP"
}
{
"event": "unsub",
"time": 1433334941,
"MessageID": 20547674933128000,
"Message_GUID": "1ab23cd4-e567-8901-2345-6789f0gh1i2j",
"email": "api@mailjet.com",
"mj_campaign_id": 7276,
"mj_contact_id": 126,
"customcampaign": "",
"CustomID": "helloworld",
"Payload": "",
"mj_list_id": 1,
"ip": "127.0.0.1",
"geo": "FR",
"agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_0)"
}
package event
// OpenEvent define an open event occurred on mailjet side
type OpenEvent struct {
baseEvent
IP string `json:"ip"` // IP address (can be IPv4 or IPv6) that triggered the event
Geo string `json:"geo"` // country code of IP address
UserAgent string `json:"agent"` // User-Agent
}
// Point return all tags and fields for influxdb
func (e *OpenEvent) Point() (map[string]string, map[string]interface{}) {
tags, fields := e.baseEvent.Point()
fields["ip"] = e.IP
fields["geo"] = e.Geo
fields["user_agent"] = e.UserAgent
return tags, fields
}
// Bytes convert the event to a json body
func (e *OpenEvent) Bytes() []byte {
return Bytes(e)
}
package event
import (
"testing"
)
func TestOpenEvent(t *testing.T) {
event := testEventParsing(t, OpenEventType)
tags, fields := event.Point()
for _, test := range []struct {
in interface{}
out interface{}
}{
{tags["type"], "open"},
{fields["id"], uint64(0)},
{fields["email"], "api@mailjet.com"},
{fields["mj_message_id"], uint64(19421777396190490)},
{fields["mj_campaign_id"], uint64(7173)},
{fields["mj_contact_id"], uint64(320)},
{fields["mj_campaign_payload"], ""},
{fields["ip"], "127.0.0.1"},
{fields["geo"], "US"},
{fields["user_agent"], "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko Firefox/11.0"},
} {
if test.in != test.out {
t.Errorf("%s != %s", test.in, test.out)
}
}
}
package event
// SentEvent define a sent event occurred on mailjet side
type SentEvent struct {
baseEvent
SMTPReply string `json:"smtp_reply"` // The raw SMTP response message
}
// Point return all tags and fields for influxdb
func (e *SentEvent) Point() (map[string]string, map[string]interface{}) {
tags, fields := e.baseEvent.Point()
fields["smtp_reply"] = e.SMTPReply