...
 
Commits (4)
......@@ -4,18 +4,15 @@ import (
"git.oolsa.net/jooola/jetstats/app/mailjet"
"git.oolsa.net/jooola/jetstats/sender"
"git.oolsa.net/jooola/jetstats/store"
"github.com/gorilla/mux"
)
// App is a jetstats application
type App struct {
config *Config
router *mux.Router
mailjet *mailjet.Mailjet
config *Config
store store.Store
sender sender.Sender
mailjet *mailjet.Mailjet
store store.Store
sender sender.Sender
}
// NewApp creates a new app and initiate its components
......@@ -30,14 +27,6 @@ func NewApp(config *Config) *App {
return a
}
// NewServer creates a server from a new app
func NewServer(config *Config) *App {
a := NewApp(config)
a.router = NewRouter(a)
return a
}
// Close closes all application components
func (a *App) Close() {
a.store.Close()
......
......@@ -13,7 +13,7 @@ func (a *App) WebhookAuthMiddleware(next http.Handler) http.Handler {
return
}
http.Error(w, "Forbidden", http.StatusForbidden)
WriteHeader(w, http.StatusForbidden)
})
}
......
......@@ -9,9 +9,10 @@ import (
// Config defines the application configuration
type Config struct {
ListenAddr string `env:"LISTEN_ADDR" envDefault:"127.0.0.1:3000"`
PublicURL string `env:"PUBLIC_URL,required"`
publicURL *url.URL
PublicAddr string `env:"PUBLIC_ADDR" envDefault:"127.0.0.1:7000"`
MetricsAddr string `env:"METRICS_ADDR" envDefault:"127.0.0.1:9123"`
PublicURL string `env:"PUBLIC_URL,required"`
publicURL *url.URL
Verbose bool `env:"VERBOSE" envDefault:"false"`
Debug bool `env:"DEBUG" envDefault:"false"`
......
......@@ -3,34 +3,28 @@ package app
import (
"log"
"net/http"
"strconv"
"git.oolsa.net/jooola/jetstats/events"
"github.com/gorilla/mux"
)
// InitWebhooksHandlers register all webhooks routes
func InitWebhooksHandlers(a *App, r *mux.Router) {
r.HandleFunc("/open", a.WebhookOpenHandler).Methods(http.MethodPost)
r.HandleFunc("/click", a.WebhookClickHandler).Methods(http.MethodPost)
r.HandleFunc("/bounce", a.WebhookBounceHandler).Methods(http.MethodPost)
r.HandleFunc("/spam", a.WebhookSpamHandler).Methods(http.MethodPost)
r.HandleFunc("/blocked", a.WebhookBlockedHandler).Methods(http.MethodPost)
r.HandleFunc("/unsub", a.WebhookUnsubHandler).Methods(http.MethodPost)
r.HandleFunc("/sent", a.WebhookSentHandler).Methods(http.MethodPost)
// WriteHeader write the response code and increment metrics counter
func WriteHeader(w http.ResponseWriter, statusCode int) {
w.WriteHeader(statusCode)
metricsReqCnt.WithLabelValues(strconv.Itoa(statusCode)).Inc()
}
func (a *App) webhookHandler(w http.ResponseWriter, r *http.Request, eventType string) {
event, err := events.Parse(eventType, r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
WriteHeader(w, http.StatusBadRequest)
log.Fatal(err)
return
}
if err := a.store.Save(event); err != nil {
w.WriteHeader(http.StatusInternalServerError)
WriteHeader(w, http.StatusInternalServerError)
log.Fatal(err)
return
......@@ -39,7 +33,7 @@ func (a *App) webhookHandler(w http.ResponseWriter, r *http.Request, eventType s
// Send to remote storage.
a.sender.Send(event)
w.WriteHeader(http.StatusOK)
WriteHeader(w, http.StatusOK)
}
// WebhookOpenHandler writes data from the mail provider to the database
......
package app
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
metricsReqCnt = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "jetstats_metric_handler_requests_total",
Help: "Total number of requests by HTTP status code.",
}, []string{"code"})
)
......@@ -9,26 +9,26 @@ import (
"time"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
// Timeout for http server in seconds
const Timeout = 15
// ServerTimeout for http server in seconds
const ServerTimeout = 15
// Serve create routes and start listening for calls
// Serve create the servers and start listening for calls
func (a *App) Serve() {
srv := &http.Server{
Addr: a.config.ListenAddr,
WriteTimeout: time.Second * Timeout,
ReadTimeout: time.Second * Timeout,
IdleTimeout: time.Second * Timeout,
Handler: a.router,
}
publicSrv := a.NewPublicServer()
metricsSrv := a.NewMetricServer()
go func() {
log.Println("starting server")
if err := publicSrv.ListenAndServe(); err != nil {
log.Fatal(err)
}
}()
if err := srv.ListenAndServe(); err != nil {
log.Println(err)
go func() {
if err := metricsSrv.ListenAndServe(); err != nil {
log.Fatal(err)
}
}()
......@@ -36,26 +36,50 @@ func (a *App) Serve() {
signal.Notify(c, os.Interrupt)
<-c
log.Println("stopping server")
ctx, cancel := context.WithTimeout(context.Background(), time.Second*Timeout)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*ServerTimeout)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
log.Fatalln(err)
if err := publicSrv.Shutdown(ctx); err != nil && err != http.ErrServerClosed {
log.Fatal(err)
}
if err := metricsSrv.Shutdown(ctx); err != nil && err != http.ErrServerClosed {
log.Fatal(err)
}
}
// NewRouter creates a router
func NewRouter(a *App) *mux.Router {
r := mux.NewRouter()
func newServer(addr string, router *mux.Router) *http.Server {
return &http.Server{
Addr: addr,
WriteTimeout: time.Second * ServerTimeout,
ReadTimeout: time.Second * ServerTimeout,
IdleTimeout: time.Second * ServerTimeout,
Handler: router,
}
}
log.Println("creating router with public url '" + a.config.PublicURL + "'")
base := r.PathPrefix(a.config.publicURL.Path).Subrouter()
// NewPublicServer return a new public server
func (a *App) NewPublicServer() *http.Server {
router := mux.NewRouter()
base := router.PathPrefix(a.config.publicURL.Path).Subrouter()
webhooks := base.PathPrefix("/webhooks").Subrouter()
webhooks.Use(a.WebhookAuthMiddleware)
InitWebhooksHandlers(a, webhooks)
webhooks.HandleFunc("/open", a.WebhookOpenHandler).Methods(http.MethodPost)
webhooks.HandleFunc("/click", a.WebhookClickHandler).Methods(http.MethodPost)
webhooks.HandleFunc("/bounce", a.WebhookBounceHandler).Methods(http.MethodPost)
webhooks.HandleFunc("/spam", a.WebhookSpamHandler).Methods(http.MethodPost)
webhooks.HandleFunc("/blocked", a.WebhookBlockedHandler).Methods(http.MethodPost)
webhooks.HandleFunc("/unsub", a.WebhookUnsubHandler).Methods(http.MethodPost)
webhooks.HandleFunc("/sent", a.WebhookSentHandler).Methods(http.MethodPost)
return newServer(a.config.PublicAddr, router)
}
// NewMetricServer return a new metrics server
func (a *App) NewMetricServer() *http.Server {
router := mux.NewRouter()
router.Handle("/metrics", promhttp.Handler())
return r
return newServer(a.config.MetricsAddr, router)
}
......@@ -10,9 +10,9 @@ var serverCmd = &cobra.Command{
Use: "server",
Short: "Listen for mailjet events using webhooks",
Run: func(cmd *cobra.Command, args []string) {
a := app.NewServer(app.NewConfig())
a := app.NewApp(app.NewConfig())
defer a.Close()
a.Serve()
defer a.Close()
},
}
This diff is collapsed.
......@@ -4,12 +4,11 @@ import (
"log"
"git.oolsa.net/jooola/jetstats/cmd"
"github.com/subosito/gotenv"
"git.oolsa.net/jooola/jetstats/x"
)
func main() {
if err := gotenv.Load(); err != nil {
if err := x.LoadEnv(); err != nil {
log.Fatal(err)
}
......
......@@ -51,7 +51,7 @@ typeset -r token=$(docker exec -it "$container_name" influx auth create \
--json | jq -r ".token" | sed -e 's/^[[:space:]]*//')
cat << EOF > .env
LISTEN_ADDR=:7000
PUBLIC_ADDR=:7000
PUBLIC_URL=http://localhost:7000
VERBOSE=true
DEBUG=true
......
......@@ -2,6 +2,7 @@ package boltdb
import (
"log"
"time"
"git.oolsa.net/jooola/jetstats/events"
......@@ -40,7 +41,7 @@ func New() *Store {
// NewWithConfig create a new store using the configuration
func NewWithConfig(config *Config) *Store {
db, err := bolt.Open(config.Database, 0600, &bolt.Options{})
db, err := bolt.Open(config.Database, 0600, &bolt.Options{Timeout: time.Second})
if err != nil {
log.Fatal(err)
}
......
package x
import (
"github.com/subosito/gotenv"
)
// LoadEnv load the .env file if it exists
func LoadEnv() error {
filename := ".env"
if FileExist(filename) {
return gotenv.Load(filename)
}
return nil
}
package x
import (
"os"
)
// FileExist return whether a file exists
func FileExist(path string) bool {
info, err := os.Stat(path)
if os.IsNotExist(err) {
return false
}
return !info.IsDir()
}