From 5f919cc9dd29300ada4fa8220d51a6dbc09105a1 Mon Sep 17 00:00:00 2001 From: Ralph Slooten Date: Thu, 4 May 2023 21:48:09 +1200 Subject: [PATCH] Feature: Option to ignore duplicate Message-IDs This option (default off) silently ignores any new messages with duplicate Message-IDs. This update includes a new database structure and automatic rebuild of existing data. --- cmd/root.go | 7 +- config/config.go | 3 + server/smtpd/smtpd.go | 16 ++- server/ui/api/v1/swagger.json | 4 + storage/database.go | 154 +++++++++++++++++++------- storage/migrationTasks.go | 200 ++++++++++++++++++++++++++++++++++ storage/search.go | 27 +++-- storage/structs.go | 18 +-- 8 files changed, 363 insertions(+), 66 deletions(-) create mode 100644 storage/migrationTasks.go diff --git a/cmd/root.go b/cmd/root.go index c639323..b691f1e 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -87,6 +87,7 @@ func init() { rootCmd.Flags().StringVar(&config.Webroot, "webroot", config.Webroot, "Set the webroot for web UI & API") rootCmd.Flags().StringVar(&server.AccessControlAllowOrigin, "api-cors", server.AccessControlAllowOrigin, "Set API CORS Access-Control-Allow-Origin header") rootCmd.Flags().BoolVar(&config.UseMessageDates, "use-message-dates", config.UseMessageDates, "Use message dates as the received dates") + rootCmd.Flags().BoolVar(&config.IgnoreDuplicateIDs, "ignore-duplicate-ids", config.IgnoreDuplicateIDs, "Ignore duplicate messages (by Message-Id)") rootCmd.Flags().StringVar(&config.UIAuthFile, "ui-auth-file", config.UIAuthFile, "A password file for web UI authentication") rootCmd.Flags().StringVar(&config.UITLSCert, "ui-tls-cert", config.UITLSCert, "TLS certificate for web UI (HTTPS) - requires ui-tls-key") @@ -97,11 +98,11 @@ func init() { rootCmd.Flags().StringVar(&config.SMTPTLSCert, "smtp-tls-cert", config.SMTPTLSCert, "TLS certificate for SMTP (STARTTLS) - requires smtp-tls-key") rootCmd.Flags().StringVar(&config.SMTPTLSKey, "smtp-tls-key", config.SMTPTLSKey, "TLS key for SMTP (STARTTLS) - requires smtp-tls-cert") rootCmd.Flags().BoolVar(&config.SMTPAuthAllowInsecure, "smtp-auth-allow-insecure", config.SMTPAuthAllowInsecure, "Enable insecure PLAIN & LOGIN authentication") - rootCmd.Flags().StringVarP(&config.SMTPCLITags, "tag", "t", config.SMTPCLITags, "Tag new messages matching filters") rootCmd.Flags().StringVar(&config.SMTPRelayConfigFile, "smtp-relay-config", config.SMTPRelayConfigFile, "SMTP configuration file to allow releasing messages") rootCmd.Flags().BoolVar(&config.SMTPRelayAllIncoming, "smtp-relay-all", config.SMTPRelayAllIncoming, "Relay all incoming messages via external SMTP server (caution!)") + rootCmd.Flags().StringVarP(&config.SMTPCLITags, "tag", "t", config.SMTPCLITags, "Tag new messages matching filters") rootCmd.Flags().BoolVarP(&logger.QuietLogging, "quiet", "q", logger.QuietLogging, "Quiet logging (errors only)") rootCmd.Flags().BoolVarP(&logger.VerboseLogging, "verbose", "v", logger.VerboseLogging, "Verbose logging") @@ -201,8 +202,8 @@ func initConfigFromEnv() { if getEnabledFromEnv("MP_USE_MESSAGE_DATES") { config.UseMessageDates = true } - if getEnabledFromEnv("MP_USE_MESSAGE_DATES") { - config.UseMessageDates = true + if getEnabledFromEnv("MP_IGNORE_DUPLICATE_IDS") { + config.IgnoreDuplicateIDs = true } if getEnabledFromEnv("MP_QUIET") { logger.QuietLogging = true diff --git a/config/config.go b/config/config.go index 75aac98..de65bab 100644 --- a/config/config.go +++ b/config/config.go @@ -65,6 +65,9 @@ var ( // SMTPAuthAcceptAny accepts any username/password including none SMTPAuthAcceptAny bool + // IgnoreDuplicateIDs will skip messages with the same ID + IgnoreDuplicateIDs bool + // SMTPCLITags is used to map the CLI args SMTPCLITags string diff --git a/server/smtpd/smtpd.go b/server/smtpd/smtpd.go index 0e26373..1c79186 100644 --- a/server/smtpd/smtpd.go +++ b/server/smtpd/smtpd.go @@ -24,12 +24,19 @@ func mailHandler(origin net.Addr, from string, to []string, data []byte) error { return err } + messageID := strings.Trim(msg.Header.Get("Message-Id"), "<>") + // add a message ID if not set - if msg.Header.Get("Message-Id") == "" { + if messageID == "" { // generate unique ID - uid := uuid.NewV4().String() + "@mailpit" + messageID = uuid.NewV4().String() + "@mailpit" // add unique ID - data = append([]byte("Message-Id: <"+uid+">\r\n"), data...) + data = append([]byte("Message-Id: <"+messageID+">\r\n"), data...) + } else if config.IgnoreDuplicateIDs { + if storage.MessageIDExists(messageID) { + logger.Log().Debugf("[smtpd] duplicate message found, ignoring %s", messageID) + return nil + } } // if enabled, this will route the email 1:1 through to the preconfigured smtp server @@ -81,7 +88,8 @@ func mailHandler(origin net.Addr, from string, to []string, data []byte) error { logger.Log().Debugf("[smtpd] added missing addresses to Bcc header: %s", strings.Join(missingAddresses, ", ")) } - if _, err := storage.Store(data); err != nil { + _, err = storage.Store(data) + if err != nil { logger.Log().Errorf("[db] error storing message: %d", err.Error()) return err diff --git a/server/ui/api/v1/swagger.json b/server/ui/api/v1/swagger.json index e38cfbe..3336174 100644 --- a/server/ui/api/v1/swagger.json +++ b/server/ui/api/v1/swagger.json @@ -655,6 +655,10 @@ "$ref": "#/definitions/Attachment" } }, + "MessageID": { + "description": "Message ID", + "type": "string" + }, "Read": { "description": "Read status", "type": "boolean" diff --git a/storage/database.go b/storage/database.go index 436d3a1..3c4637f 100644 --- a/storage/database.go +++ b/storage/database.go @@ -72,20 +72,55 @@ var ( Script: `ALTER TABLE mailbox ADD COLUMN Tags Text NOT NULL DEFAULT '[]'; CREATE INDEX IF NOT EXISTS idx_tags ON mailbox (Tags);`, }, + { + Version: 1.2, + Description: "Creating new mailbox format", + Script: `CREATE TABLE IF NOT EXISTS mailboxtmp ( + Created INTEGER NOT NULL, + ID TEXT NOT NULL, + MessageID TEXT NOT NULL, + Subject TEXT NOT NULL, + Metadata TEXT, + Size INTEGER NOT NULL, + Inline INTEGER NOT NULL, + Attachments INTEGER NOT NULL, + Read INTEGER, + Tags TEXT, + SearchText TEXT + ); + INSERT INTO mailboxtmp + (Created, ID, MessageID, Subject, Metadata, Size, Inline, Attachments, SearchText, Read, Tags) + SELECT + Sort, ID, '', json_extract(Data, '$.Subject'),Data, + json_extract(Data, '$.Size'), json_extract(Data, '$.Inline'), json_extract(Data, '$.Attachments'), + Search, Read, Tags + FROM mailbox; + + DROP TABLE IF EXISTS mailbox; + ALTER TABLE mailboxtmp RENAME TO mailbox; + CREATE INDEX IF NOT EXISTS idx_created ON mailbox (Created); + CREATE UNIQUE INDEX IF NOT EXISTS idx_id ON mailbox (ID); + CREATE INDEX IF NOT EXISTS idx_message_id ON mailbox (MessageID); + CREATE INDEX IF NOT EXISTS idx_subject ON mailbox (Subject); + CREATE INDEX IF NOT EXISTS idx_size ON mailbox (Size); + CREATE INDEX IF NOT EXISTS idx_inline ON mailbox (Inline); + CREATE INDEX IF NOT EXISTS idx_attachments ON mailbox (Attachments); + CREATE INDEX IF NOT EXISTS idx_read ON mailbox (Read); + CREATE INDEX IF NOT EXISTS idx_tags ON mailbox (Tags);`, + }, } ) // DBMailSummary struct for storing mail summary type DBMailSummary struct { - Created time.Time - From *mail.Address - To []*mail.Address - Cc []*mail.Address - Bcc []*mail.Address - Subject string - Size int - Inline int - Attachments int + From *mail.Address + To []*mail.Address + Cc []*mail.Address + Bcc []*mail.Address + // Subject string + // Size int + // Inline int + // Attachments int } // InitDB will initialise the database @@ -144,6 +179,8 @@ func InitDB() error { // auto-prune & delete go dbCron() + go dataMigrations() + return nil } @@ -189,22 +226,21 @@ func Store(body []byte) (string, error) { from = &mail.Address{Name: env.GetHeader("From")} } + messageID := strings.Trim(env.Root.Header.Get("Message-ID"), "<>") + obj := DBMailSummary{ - Created: time.Now(), - From: from, - To: addressToSlice(env, "To"), - Cc: addressToSlice(env, "Cc"), - Bcc: addressToSlice(env, "Bcc"), - Subject: env.GetHeader("Subject"), - Size: len(body), - Inline: len(env.Inlines), - Attachments: len(env.Attachments), + From: from, + To: addressToSlice(env, "To"), + Cc: addressToSlice(env, "Cc"), + Bcc: addressToSlice(env, "Bcc"), } + created := time.Now() + // use message date instead of created date if config.UseMessageDates { if mDate, err := env.Date(); err == nil { - obj.Created = mDate + created = mDate } } @@ -237,8 +273,14 @@ func Store(body []byte) (string, error) { // roll back if it fails defer tx.Rollback() + subject := env.GetHeader("Subject") + size := len(body) + inline := len(env.Inlines) + attachments := len(env.Attachments) + // insert mail summary data - _, err = tx.Exec("INSERT INTO mailbox(ID, Data, Search, Tags, Read) values(?,?,?,?,0)", id, string(summaryJSON), searchText, string(tagJSON)) + _, err = tx.Exec("INSERT INTO mailbox(Created, ID, MessageID, Subject, Metadata, Size, Inline, Attachments, SearchText, Tags, Read) values(?,?,?,?,?,?,?,?,?,?,0)", + created.UnixMilli(), id, messageID, subject, string(summaryJSON), size, inline, attachments, searchText, string(tagJSON)) if err != nil { return "", err } @@ -259,9 +301,12 @@ func Store(body []byte) (string, error) { return "", err } - c.Tags = tagData - + c.Created = created c.ID = id + c.Attachments = attachments + c.Subject = subject + c.Size = size + c.Tags = tagData websockets.Broadcast("new", c) @@ -276,24 +321,28 @@ func List(start, limit int) ([]MessageSummary, error) { results := []MessageSummary{} q := sqlf.From("mailbox"). - Select(`ID, Data, Tags, Read`). - OrderBy("Sort DESC"). + Select(`Created, ID, Subject, Metadata, Size, Attachments, Read, Tags`). + OrderBy("Created DESC"). Limit(limit). Offset(start) if err := q.QueryAndClose(nil, db, func(row *sql.Rows) { + var created int64 var id string - var summary string + var subject string + var metadata string + var size int + var attachments int var tags string var read int em := MessageSummary{} - if err := row.Scan(&id, &summary, &tags, &read); err != nil { + if err := row.Scan(&created, &id, &subject, &metadata, &size, &attachments, &read, &tags); err != nil { logger.Log().Error(err) return } - if err := json.Unmarshal([]byte(summary), &em); err != nil { + if err := json.Unmarshal([]byte(metadata), &em); err != nil { logger.Log().Error(err) return } @@ -303,11 +352,17 @@ func List(start, limit int) ([]MessageSummary, error) { return } + em.Created = time.UnixMilli(created) em.ID = id + em.Subject = subject + em.Size = size + em.Attachments = attachments em.Read = read == 1 results = append(results, em) + // logger.PrettyPrint(em) + }); err != nil { return results, err } @@ -342,19 +397,23 @@ func Search(search string, start, limit int) ([]MessageSummary, error) { q := searchParser(args, start, limit) if err := q.QueryAndClose(nil, db, func(row *sql.Rows) { + var created int64 var id string - var summary string + var subject string + var metadata string + var size int + var attachments int var tags string var read int var ignore string em := MessageSummary{} - if err := row.Scan(&id, &summary, &tags, &read, &ignore, &ignore, &ignore, &ignore, &ignore, &ignore); err != nil { + if err := row.Scan(&created, &id, &subject, &metadata, &size, &attachments, &read, &tags, &ignore, &ignore, &ignore, &ignore); err != nil { logger.Log().Error(err) return } - if err := json.Unmarshal([]byte(summary), &em); err != nil { + if err := json.Unmarshal([]byte(metadata), &em); err != nil { logger.Log().Error(err) return } @@ -364,7 +423,11 @@ func Search(search string, start, limit int) ([]MessageSummary, error) { return } + em.Created = time.UnixMilli(created) em.ID = id + em.Subject = subject + em.Size = size + em.Attachments = attachments em.Read = read == 1 results = append(results, em) @@ -404,6 +467,8 @@ func GetMessage(id string) (*Message, error) { from = &mail.Address{Name: env.GetHeader("From")} } + messageID := strings.Trim(env.GetHeader("Message-ID"), "<>") + returnPath := strings.Trim(env.GetHeader("Return-Path"), "<>") if returnPath == "" { returnPath = from.Address @@ -413,27 +478,20 @@ func GetMessage(id string) (*Message, error) { if err != nil { // return received datetime when message does not contain a date header q := sqlf.From("mailbox"). - Select(`Data`). - OrderBy("Sort DESC"). + Select(`Created`). Where(`ID = ?`, id) if err := q.QueryAndClose(nil, db, func(row *sql.Rows) { - var summary string - em := MessageSummary{} + var created int64 - if err := row.Scan(&summary); err != nil { - logger.Log().Error(err) - return - } - - if err := json.Unmarshal([]byte(summary), &em); err != nil { + if err := row.Scan(&created); err != nil { logger.Log().Error(err) return } logger.Log().Debugf("[db] %s does not contain a date header, using received datetime", id) - date = em.Created + date = time.UnixMicro(created) }); err != nil { logger.Log().Error(err) } @@ -441,6 +499,7 @@ func GetMessage(id string) (*Message, error) { obj := Message{ ID: id, + MessageID: messageID, Read: true, From: from, Date: date, @@ -821,3 +880,16 @@ func IsUnread(id string) bool { return unread == 1 } + +// MessageIDExists blaah +func MessageIDExists(id string) bool { + var total int + + q := sqlf.From("mailbox"). + Select("COUNT(*)").To(&total). + Where("MessageID = ?", id) + + _ = q.QueryRowAndClose(nil, db) + + return total != 0 +} diff --git a/storage/migrationTasks.go b/storage/migrationTasks.go new file mode 100644 index 0000000..dd919e7 --- /dev/null +++ b/storage/migrationTasks.go @@ -0,0 +1,200 @@ +package storage + +import ( + "bytes" + "context" + "database/sql" + "strings" + "time" + + "github.com/axllent/mailpit/config" + "github.com/axllent/mailpit/utils/logger" + "github.com/jhillyerd/enmime" + "github.com/leporo/sqlf" + "golang.org/x/text/language" + "golang.org/x/text/message" +) + +func dataMigrations() { + updateSortByCreatedTask() + assignMessageIDsTask() +} + +// Update Sort column using Created datetime <= v1.6.5 +// Migration task implemented 05/2023 - can be removed end 2023 +func updateSortByCreatedTask() { + q := sqlf.From("mailbox"). + Select("ID"). + Select(`json_extract(Metadata, '$.Created') as Created`). + Where("Created < ?", 1155000600) + + toUpdate := make(map[string]int64) + p := message.NewPrinter(language.English) + + if err := q.QueryAndClose(nil, db, func(row *sql.Rows) { + var id string + var ts sql.NullString + if err := row.Scan(&id, &ts); err != nil { + logger.Log().Error("[migration]", err) + return + } + + if !ts.Valid { + logger.Log().Errorf("[migration] cannot get Created timestamp from %s", id) + return + } + + t, _ := time.Parse(time.RFC3339Nano, ts.String) + toUpdate[id] = t.UnixMilli() + }); err != nil { + logger.Log().Error("[migration]", err) + return + } + + total := len(toUpdate) + + if total == 0 { + return + } + + logger.Log().Infof("[migration] updating timestamp for %s messages", p.Sprintf("%d", len(toUpdate))) + + // begin a transaction + ctx := context.Background() + tx, err := db.BeginTx(ctx, nil) + if err != nil { + logger.Log().Error("[migration]", err) + return + } + + // roll back if it fails + defer tx.Rollback() + + var blockTime = time.Now() + + count := 0 + for id, ts := range toUpdate { + count++ + _, err := tx.Exec(`UPDATE mailbox SET Created = ? WHERE ID = ?`, ts, id) + if err != nil { + logger.Log().Error("[migration]", err) + } + + if count%1000 == 0 { + percent := (100 * count) / total + logger.Log().Infof("[migration] updated timestamp for 1,000 messages [%d%%] in %s", percent, time.Since(blockTime)) + blockTime = time.Now() + } + } + + logger.Log().Infof("[migration] commit %s changes", p.Sprintf("%d", count)) + + if err := tx.Commit(); err != nil { + logger.Log().Error("[migration]", err) + return + } + + logger.Log().Infof("[migration] complete") +} + +// Find any messages without a stored Message-ID and update it <= v1.6.5 +// Migration task implemented 05/2023 - can be removed end 2023 +func assignMessageIDsTask() { + if !config.IgnoreDuplicateIDs { + return + } + + q := sqlf.From("mailbox"). + Select("ID"). + Where("MessageID = ''") + + missingIDS := make(map[string]string) + + if err := q.QueryAndClose(nil, db, func(row *sql.Rows) { + var id string + if err := row.Scan(&id); err != nil { + logger.Log().Error("[migration]", err) + return + } + missingIDS[id] = "" + }); err != nil { + logger.Log().Error("[migration]", err) + } + + if len(missingIDS) == 0 { + return + } + + var count int + var blockTime = time.Now() + p := message.NewPrinter(language.English) + + total := len(missingIDS) + + logger.Log().Infof("[migration] extracting Message-IDs for %s messages", p.Sprintf("%d", total)) + + for id := range missingIDS { + raw, err := GetMessageRaw(id) + if err != nil { + logger.Log().Error("[migration]", err) + continue + } + + r := bytes.NewReader(raw) + + env, err := enmime.ReadEnvelope(r) + if err != nil { + logger.Log().Error("[migration]", err) + continue + } + + messageID := strings.Trim(env.GetHeader("Message-ID"), "<>") + + missingIDS[id] = messageID + + count++ + + if count%1000 == 0 { + percent := (100 * count) / total + logger.Log().Infof("[migration] extracted 1,000 Message-IDs [%d%%] in %s", percent, time.Since(blockTime)) + blockTime = time.Now() + } + } + + // begin a transaction + ctx := context.Background() + tx, err := db.BeginTx(ctx, nil) + if err != nil { + logger.Log().Error("[migration]", err) + return + } + + // roll back if it fails + defer tx.Rollback() + + count = 0 + + for id, mid := range missingIDS { + _, err = tx.Exec(`UPDATE mailbox SET MessageID = ? WHERE ID = ?`, mid, id) + if err != nil { + logger.Log().Error("[migration]", err) + } + + count++ + + if count%1000 == 0 { + percent := (100 * count) / total + logger.Log().Infof("[migration] stored 1,000 Message-IDs [%d%%] in %s", percent, time.Since(blockTime)) + blockTime = time.Now() + } + } + + logger.Log().Infof("[migration] commit %s changes", p.Sprintf("%d", count)) + + if err := tx.Commit(); err != nil { + logger.Log().Error("[migration]", err) + return + } + + logger.Log().Infof("[migration] complete") +} diff --git a/storage/search.go b/storage/search.go index 75d98f4..abf6438 100644 --- a/storage/search.go +++ b/storage/search.go @@ -14,15 +14,13 @@ func searchParser(args []string, start, limit int) *sqlf.Stmt { } q := sqlf.From("mailbox"). - Select(`ID, Data, Tags, Read, - json_extract(Data, '$.To') as ToJSON, - json_extract(Data, '$.From') as FromJSON, - IFNULL(json_extract(Data, '$.Cc'), '{}') as CcJSON, - IFNULL(json_extract(Data, '$.Bcc'), '{}') as BccJSON, - json_extract(Data, '$.Subject') as Subject, - json_extract(Data, '$.Attachments') as Attachments + Select(`Created, ID, Subject, Metadata, Size, Attachments, Read, Tags, + IFNULL(json_extract(Metadata, '$.To'), '{}') as ToJSON, + IFNULL(json_extract(Metadata, '$.From'), '{}') as FromJSON, + IFNULL(json_extract(Metadata, '$.Cc'), '{}') as CcJSON, + IFNULL(json_extract(Metadata, '$.Bcc'), '{}') as BccJSON `). - OrderBy("Sort DESC"). + OrderBy("Created DESC"). Limit(limit). Offset(start) @@ -92,6 +90,15 @@ func searchParser(args []string, start, limit int) *sqlf.Stmt { q.Where("Subject LIKE ?", "%"+escPercentChar(w)+"%") } } + } else if strings.HasPrefix(w, "message-id:") { + w = cleanString(w[11:]) + if w != "" { + if exclude { + q.Where("MessageID NOT LIKE ?", "%"+escPercentChar(w)+"%") + } else { + q.Where("MessageID LIKE ?", "%"+escPercentChar(w)+"%") + } + } } else if strings.HasPrefix(w, "tag:") { w = cleanString(w[4:]) if w != "" { @@ -122,9 +129,9 @@ func searchParser(args []string, start, limit int) *sqlf.Stmt { } else { // search text if exclude { - q.Where("search NOT LIKE ?", "%"+cleanString(escPercentChar(w))+"%") + q.Where("SearchText NOT LIKE ?", "%"+cleanString(escPercentChar(w))+"%") } else { - q.Where("search LIKE ?", "%"+cleanString(escPercentChar(w))+"%") + q.Where("SearchText LIKE ?", "%"+cleanString(escPercentChar(w))+"%") } } } diff --git a/storage/structs.go b/storage/structs.go index e833e39..2b0f49f 100644 --- a/storage/structs.go +++ b/storage/structs.go @@ -11,8 +11,10 @@ import ( // // swagger:model Message type Message struct { - // Unique message database id + // Database ID ID string + // Message ID + MessageID string // Read status Read bool // From address @@ -25,7 +27,7 @@ type Message struct { Bcc []*mail.Address // ReplyTo addresses ReplyTo []*mail.Address - // ReturnPath is the Return-Path + // Return-Path ReturnPath string // Message subject Subject string @@ -49,15 +51,15 @@ type Message struct { // // swagger:model Attachment type Attachment struct { - // attachment part id + // Attachment part ID PartID string - // file name + // File name FileName string - // content type + // Content type ContentType string - // content id + // Content ID ContentID string - // size in bytes + // Size in bytes Size int } @@ -65,7 +67,7 @@ type Attachment struct { // // swagger:model MessageSummary type MessageSummary struct { - // Unique message database id + // Database ID ID string // Read status Read bool