...
 
Commits (4)
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Dependency directories (remove the comment below to include it)
# vendor/
/jetstats*
---
image: golang:latest
stages:
- test
- build
lint:
stage: test
script:
- make lint
test:
stage: test
script:
- make test
build:
stage: build
script:
- make build
only:
- tags
artifacts:
paths:
- jetstats
linters:
enable:
- bodyclose
- deadcode
- depguard
- dogsled
- dupl
- errcheck
- funlen
- gocognit
- goconst
- gocritic
- gocyclo
- godox
- gofmt
- goimports
- golint
- gomnd
- goprintffuncname
- gosec
- gosimple
- govet
- ineffassign
- interfacer
- lll
- maligned
- misspell
- nakedret
- prealloc
- rowserrcheck
- scopelint
- staticcheck
- structcheck
- stylecheck
- typecheck
- unconvert
- unparam
- unused
- varcheck
- whitespace
- wsl
.DEFAULT_GOAL := run
SHELL := bash
install:
go get
build:
go build ./cmd/jetstats
run:
go run ./cmd/jetstats
lint:
golangci-lint --color always run
test:
go test -v ./...
# mailstats
# jetstats
package jetstats
import (
"github.com/gorilla/mux"
mailjet "github.com/mailjet/mailjet-apiv3-go"
"go.etcd.io/bbolt"
)
// App is a mailstats application
type App struct {
config *Config
store *bbolt.DB
router *mux.Router
mailjetClient *mailjet.Client
}
// NewApp creates a new app and initiate its components
func NewApp(config *Config) *App {
a := &App{}
a.config = config
a.store = NewStore(a)
a.mailjetClient = NewMailjetClient(a)
return a
}
// Close closes all application components
func (a *App) Close() {
a.store.Close()
}
package jetstats
import (
"net/http"
)
// WebhookAuthMiddleware handle a basic authentification
func (a *App) WebhookAuthMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
user, pass, ok := r.BasicAuth()
if ok && a.IsValidWebhookAuth(user, pass) {
next.ServeHTTP(w, r)
return
}
http.Error(w, "Forbidden", http.StatusForbidden)
})
}
// IsValidWebhookAuth will check if the webhooks credentials are valid
func (a *App) IsValidWebhookAuth(user string, pass string) bool {
return user == a.config.WebhookUser && pass == a.config.WebhookPass
}
package jetstats
import (
"log"
"net/url"
"github.com/jinzhu/configor"
)
// Config defines the application configuration
type Config struct {
ListenAddr string `default:"127.0.0.1:7000" yaml:"listen_addr"`
PublicURL string `required:"true" yaml:"public_url"`
publicURL *url.URL
Database string `default:"data.db" yaml:"database"`
WebhookUser string `required:"true" yaml:"webhook_user"`
WebhookPass string `required:"true" yaml:"webhook_pass"`
MailjetUser string `required:"true" yaml:"mailjet_user"`
MailjetPass string `required:"true" yaml:"mailjet_pass"`
}
// NewConfig return a new configuration from path
func NewConfig(path string) *Config {
c := Config{}
if err := configor.Load(&c, path); err != nil {
log.Fatalln(err)
}
if err := c.Validate(); err != nil {
log.Fatalln(err)
}
return &c
}
// Validate check whether the configuration is valid
func (c *Config) Validate() error {
parsedURL, err := url.Parse(c.PublicURL)
if err != nil {
return err
}
c.publicURL = parsedURL
return nil
}
package jetstats
// EventType type
type EventType string
// Event type enum
const (
OpenEventType EventType = "open"
ClickEventType EventType = "click"
BounceEventType EventType = "bounce"
SpamEventType EventType = "spam"
BlockedEventType EventType = "blocked"
UnsubEventType EventType = "unsub"
SentEventType EventType = "sent"
)
// EventTypes defines all events
var EventTypes = []EventType{
SentEventType,
OpenEventType,
ClickEventType,
UnsubEventType,
BounceEventType,
BlockedEventType,
SpamEventType,
}
package jetstats
import (
mailjet "github.com/mailjet/mailjet-apiv3-go"
)
// NewMailjetClient check whether the mailjet hooks are setup and valid
func NewMailjetClient(a *App) *mailjet.Client {
return mailjet.NewMailjetClient(a.config.MailjetUser, a.config.MailjetPass)
}
package jetstats
import (
"bytes"
"encoding/json"
"log"
"net/http"
)
// Event interface for all events
type Event interface {
Decode(r *http.Request) error
Encode() []byte
SetID(uint64)
}
// BaseEvent define an event occurred on mailjet side
type BaseEvent struct {
ID uint64 `json:"id"`
Event string `json:"event"` // the event type
Timestamp uint64 `json:"time"` // Unix timestamp of event
Email string `json:"email"` // email address of recipient triggering the event
MailjetMessageID uint64 `json:"MessageID"` // internal Mailjet message ID
MailjetCampaignID uint64 `json:"mj_campaign_id"` // internal Mailjet campaign ID associated to the message
MailjetContactID uint64 `json:"mj_contact_id"` // internal Mailjet contact ID
CustomCampaign string `json:"customcampaign"` // value of the X-Mailjet-Campaign header when provided
Payload string `json:"Payload"` // the custom ID, when provided at send time
CustomID string `json:"CustomID"` // the event payload, when provided at send time
}
// Decode mailjet event json body
func (e *BaseEvent) Decode(r *http.Request) error {
decoder := json.NewDecoder(r.Body)
return decoder.Decode(&e)
}
// Encode mailjet event json body
func (e *BaseEvent) Encode() []byte {
buf := bytes.NewBuffer(nil)
encoder := json.NewEncoder(buf)
if err := encoder.Encode(e); err != nil {
log.Fatalln(err)
}
return buf.Bytes()
}
// SetID set the internal event ID
func (e *BaseEvent) SetID(value uint64) {
e.ID = value
}
// SentEvent define a sent event occurred on mailjet side
type SentEvent struct {
BaseEvent
SMTPReply string `json:"smtp_reply"` // The raw SMTP response message
}
// OpenEvent define an open event occurred on mailjet side
type OpenEvent struct {
BaseEvent
IP string `json:"ip"` // IP address (can be IPv4 or IPv6) that triggered the event
Geo string `json:"geo"` // country code of IP address
UserAgent string `json:"agent"` // User-Agent
}
// ClickEvent define a click event occurred on mailjet side
type ClickEvent struct {
OpenEvent
URL string `json:"url"` // the link that was clicked
}
// UnsubEvent define a unsub event occurred on mailjet side
type UnsubEvent struct {
OpenEvent
MailjetListID string `json:"mj_list_id"` // internal Mailjet list ID associated to the contact
}
// BounceEvent define a bounce event occurred on mailjet side
type BounceEvent struct {
BaseEvent
Blocked bool `json:"blocked"` // true if this bounce leads to the recipient being blocked
HardBounce bool `json:"hard_bounce"` // true if error was permanent
// See https://dev.mailjet.com/email/guides/webhooks/#possible-values-for-errors
ErrorRelatedTo string `json:"error_related_to"`
Error string `json:"error"`
Comment string `json:"comment"` // the raw SMTP error code, including descriptions of the reason for the bounce
}
// BlockedEvent define a blocked event occurred on mailjet side
type BlockedEvent struct {
BaseEvent
// See https://dev.mailjet.com/email/guides/webhooks/#possible-values-for-errors
ErrorRelatedTo string `json:"error_related_to"`
Error string `json:"error"`
}
// SpamEvent define a spam event occurred on mailjet side
type SpamEvent struct {
BaseEvent
Source string `json:"source"` // indicates which feedback loop program reported this complaint
}
package jetstats
import (
mailjet "github.com/mailjet/mailjet-apiv3-go"
"github.com/mailjet/mailjet-apiv3-go/resources"
)
func (a *App) mailjetListWebhooks() ([]resources.Eventcallbackurl, error) {
var data []resources.Eventcallbackurl
_, _, err := a.mailjetClient.List("eventcallbackurl", &data)
if err != nil {
return nil, err
}
return data, nil
}
func (a *App) mailjetDeleteWebhooks(webhooks []resources.Eventcallbackurl) error {
for _, webhook := range webhooks {
err := a.mailjetClient.Delete(&mailjet.Request{
Resource: "eventcallbackurl",
ID: webhook.ID,
})
if err != nil {
return err
}
}
return nil
}
func (a *App) mailjetCreateWebhook(payload resources.Eventcallbackurl) error {
return a.mailjetClient.Post(&mailjet.FullRequest{
Info: &mailjet.Request{Resource: "eventcallbackurl"},
Payload: &payload,
}, nil)
}
package jetstats
import (
"log"
"github.com/gorilla/mux"
)
// NewRouter creates a router
func NewRouter(a *App) *mux.Router {
r := mux.NewRouter()
log.Println("creating router with base url '" + a.config.publicURL.Path + "'")
base := r.PathPrefix(a.config.publicURL.Path).Subrouter()
webhooks := base.PathPrefix("/webhooks").Subrouter()
webhooks.Use(a.WebhookAuthMiddleware)
InitWebhooksHandlers(a, webhooks)
return r
}
package jetstats
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"time"
)
// Timeout for http server in seconds
const Timeout = 15
// Serve create routes and start listening for calls
func (a *App) Serve() {
a.router = NewRouter(a)
srv := &http.Server{
Addr: a.config.ListenAddr,
WriteTimeout: time.Second * Timeout,
ReadTimeout: time.Second * Timeout,
IdleTimeout: time.Second * Timeout,
Handler: a.router,
}
go func() {
log.Println("starting server at 'http://" + a.config.ListenAddr + "'")
if err := srv.ListenAndServe(); err != nil {
log.Println(err)
}
}()
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
log.Println("shutting down server")
ctx, cancel := context.WithTimeout(context.Background(), time.Second*Timeout)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
log.Fatalln(err)
}
}
package jetstats
import (
"fmt"
"log"
"go.etcd.io/bbolt"
)
// NewStore creates a store
func NewStore(a *App) *bbolt.DB {
db, err := bbolt.Open(a.config.Database, 0600, nil)
if err != nil {
log.Fatalln(err)
}
if err := db.Update(func(tx *bbolt.Tx) error {
if _, err := tx.CreateBucketIfNotExists([]byte("event")); err != nil {
return err
}
if _, err := tx.CreateBucketIfNotExists([]byte("queue")); err != nil {
return err
}
return nil
}); err != nil {
log.Fatalln(err)
}
return db
}
// SaveEvent store the event and add it to the queue
func (a *App) SaveEvent(eventType EventType, event Event) error {
return a.store.Update(func(tx *bbolt.Tx) error {
eventBucket := tx.Bucket([]byte("event"))
queueBucket := tx.Bucket([]byte("queue"))
id, _ := eventBucket.NextSequence()
event.SetID(id)
key := fmt.Sprintf("%s_%d", eventType, id)
payload := event.Encode()
if err := eventBucket.Put([]byte(key), payload); err != nil {
return err
}
if err := queueBucket.Put([]byte(key), payload); err != nil {
return err
}
return nil
})
}
package jetstats
import (
"errors"
"net/url"
"path"
"github.com/mailjet/mailjet-apiv3-go/resources"
)
func (a *App) createWebhookURL(eventType EventType) string {
u := *a.config.publicURL
u.Path = path.Join(u.Path, "webhooks", string(eventType))
u.User = url.UserPassword(a.config.WebhookUser, a.config.WebhookPass)
return u.String()
}
// WebhooksStatus check if mailjet webhooks are valid and all set
func (a *App) WebhooksStatus() ([]resources.Eventcallbackurl, error) {
webhooks, err := a.mailjetListWebhooks()
if err != nil {
return nil, err
}
var validWebhooks []resources.Eventcallbackurl
for _, eventType := range EventTypes {
for _, webhook := range webhooks {
if string(eventType) == webhook.EventType {
if !webhook.IsBackup &&
webhook.Status == "alive" &&
webhook.URL == a.createWebhookURL(eventType) {
validWebhooks = append(validWebhooks, webhook)
}
}
}
}
if len(validWebhooks) != len(EventTypes) {
return validWebhooks, errors.New("mailjet webhooks are not set or not valid")
}
return validWebhooks, nil
}
// WebhooksSetup delete old mailjet webhooks and recreate them
func (a *App) WebhooksSetup() error {
if err := a.WebhooksDelete(); err != nil {
return err
}
for _, eventType := range EventTypes {
err := a.mailjetCreateWebhook(resources.Eventcallbackurl{
EventType: string(eventType),
Status: "alive",
URL: a.createWebhookURL(eventType),
})
if err != nil {
return err
}
}
return nil
}
// WebhooksDelete delete old mailjet webhooks
func (a *App) WebhooksDelete() error {
webhooks, err := a.mailjetListWebhooks()
if err != nil {
return err
}
if err := a.mailjetDeleteWebhooks(webhooks); err != nil {
return err
}
return nil
}
package jetstats
import (
"log"
"net/http"
"github.com/gorilla/mux"
)
// InitWebhooksHandlers register all webhooks routes
func InitWebhooksHandlers(a *App, r *mux.Router) {
r.HandleFunc("/open", a.WebhookOpenHandler).Methods(http.MethodPost)
r.HandleFunc("/click", a.WebhookClickHandler).Methods(http.MethodPost)
r.HandleFunc("/bounce", a.WebhookBounceHandler).Methods(http.MethodPost)
r.HandleFunc("/spam", a.WebhookSpamHandler).Methods(http.MethodPost)
r.HandleFunc("/blocked", a.WebhookBlockedHandler).Methods(http.MethodPost)
r.HandleFunc("/unsub", a.WebhookUnsubHandler).Methods(http.MethodPost)
r.HandleFunc("/sent", a.WebhookSentHandler).Methods(http.MethodPost)
}
func (a *App) webhookHandler(w http.ResponseWriter, r *http.Request, eventType EventType) {
var event Event
switch eventType {
case OpenEventType:
event = &OpenEvent{}
case ClickEventType:
event = &ClickEvent{}
case BounceEventType:
event = &BounceEvent{}
case SpamEventType:
event = &SpamEvent{}
case BlockedEventType:
event = &BlockedEvent{}
case UnsubEventType:
event = &UnsubEvent{}
case SentEventType:
event = &SentEvent{}
}
if err := event.Decode(r); err != nil {
w.WriteHeader(http.StatusBadRequest)
log.Fatalln(err)
}
if err := a.SaveEvent(eventType, event); err != nil {
w.WriteHeader(http.StatusInternalServerError)
log.Fatalln(err)
return
}
w.WriteHeader(http.StatusOK)
}
// WebhookOpenHandler writes data from the mail provider to the database
func (a *App) WebhookOpenHandler(w http.ResponseWriter, r *http.Request) {
a.webhookHandler(w, r, OpenEventType)
}
// WebhookClickHandler writes data from the mail provider to the database
func (a *App) WebhookClickHandler(w http.ResponseWriter, r *http.Request) {
a.webhookHandler(w, r, ClickEventType)
}
// WebhookBounceHandler writes data from the mail provider to the database
func (a *App) WebhookBounceHandler(w http.ResponseWriter, r *http.Request) {
a.webhookHandler(w, r, BounceEventType)
}
// WebhookSpamHandler writes data from the mail provider to the database
func (a *App) WebhookSpamHandler(w http.ResponseWriter, r *http.Request) {
a.webhookHandler(w, r, SpamEventType)
}
// WebhookBlockedHandler writes data from the mail provider to the database
func (a *App) WebhookBlockedHandler(w http.ResponseWriter, r *http.Request) {
a.webhookHandler(w, r, BlockedEventType)
}
// WebhookUnsubHandler writes data from the mail provider to the database
func (a *App) WebhookUnsubHandler(w http.ResponseWriter, r *http.Request) {
a.webhookHandler(w, r, UnsubEventType)
}
// WebhookSentHandler writes data from the mail provider to the database
func (a *App) WebhookSentHandler(w http.ResponseWriter, r *http.Request) {
a.webhookHandler(w, r, SentEventType)
}
package main
import (
"log"
"github.com/spf13/cobra"
)
var rootCmd = &cobra.Command{
Use: "jetstats",
}
var (
configPath string
)
func init() {
rootCmd.PersistentFlags().StringVarP(&configPath, "config", "c", "jetstats.yml", "configuration file")
rootCmd.AddCommand(serverCmd)
rootCmd.AddCommand(webhooksCmd)
}
func main() {
if err := rootCmd.Execute(); err != nil {
log.Fatalln(err)
}
}
package main
import (
"git.oolsa.net/jooola/mailstats/app"
"github.com/spf13/cobra"
)
var serverCmd = &cobra.Command{
Use: "server",
Run: func(cmd *cobra.Command, args []string) {
a := app.NewApp(app.NewConfig(configPath))
a.Serve()
defer a.Close()
},
}
package main
import (
"fmt"
"log"
"git.oolsa.net/jooola/mailstats/app"
"github.com/spf13/cobra"
)
var webhooksCmd = &cobra.Command{
Use: "webhooks",
}
func init() {
webhooksCmd.AddCommand(webhooksStatusCmd)
webhooksCmd.AddCommand(webhooksSetupCmd)
webhooksCmd.AddCommand(webhooksDeleteCmd)
}
var webhooksStatusCmd = &cobra.Command{
Use: "status",
Run: func(cmd *cobra.Command, args []string) {
a := app.NewApp(app.NewConfig(configPath))
webhooks, err := a.WebhooksStatus()
for _, webhook := range webhooks {
fmt.Printf("%s: %s\tbackup: %t\turl: %s\n", webhook.EventType, webhook.Status, webhook.IsBackup, webhook.URL)
}
if err != nil {
log.Fatalln(err)
}
defer a.Close()
},
}
var webhooksSetupCmd = &cobra.Command{
Use: "setup",
Run: func(cmd *cobra.Command, args []string) {
a := app.NewApp(app.NewConfig(configPath))
if err := a.WebhooksSetup(); err != nil {
log.Fatalln(err)
}
defer a.Close()
},
}
var webhooksDeleteCmd = &cobra.Command{
Use: "delete",
Run: func(cmd *cobra.Command, args []string) {
a := app.NewApp(app.NewConfig(configPath))
if err := a.WebhooksDelete(); err != nil {
log.Fatalln(err)
}
defer a.Close()
},
}
module git.oolsa.net/jooola/mailstats
go 1.14
require (
github.com/gorilla/mux v1.7.4
github.com/jinzhu/configor v1.2.0
github.com/mailjet/mailjet-apiv3-go v0.0.0-20190724151621-55e56f74078c
github.com/spf13/cobra v1.0.0
github.com/spf13/pflag v1.0.5 // indirect
go.etcd.io/bbolt v1.3.4
golang.org/x/sys v0.0.0-20200513112337-417ce2331b5c // indirect
gopkg.in/yaml.v2 v2.3.0 // indirect
)
This diff is collapsed.