mirror of
https://github.com/amigan/aim-oscar-server.git
synced 2025-01-31 04:52:37 -05:00
added message deliveriny goroutine
This commit is contained in:
parent
3e69ba3e4c
commit
7b37595f2b
6 changed files with 92 additions and 10 deletions
|
@ -22,7 +22,7 @@ func init() {
|
|||
|
||||
type GenericServiceControls struct{}
|
||||
|
||||
func (g *GenericServiceControls) HandleSNAC(ctx context.Context, db *bun.DB, snac *oscar.SNAC) (context.Context, error) {
|
||||
func (g *GenericServiceControls) HandleSNAC(ctx context.Context, db *bun.DB, snac *oscar.SNAC, comm chan *models.Message) (context.Context, error) {
|
||||
session, _ := oscar.SessionFromContext(ctx)
|
||||
|
||||
switch snac.Header.Subtype {
|
||||
|
@ -83,7 +83,7 @@ func (g *GenericServiceControls) HandleSNAC(ctx context.Context, db *bun.DB, sna
|
|||
|
||||
tlvs := []*oscar.TLV{
|
||||
oscar.NewTLV(0x01, util.Dword(0x80)), // User Class
|
||||
oscar.NewTLV(0x06, util.Dword(0x0001|0x0100)), // User Status
|
||||
oscar.NewTLV(0x06, util.Dword(0x0001|0x0100)), // TODO: User Status
|
||||
oscar.NewTLV(0x0a, util.Dword(binary.BigEndian.Uint32([]byte(SRV_HOST)))), // External IP
|
||||
oscar.NewTLV(0x0f, util.Dword(uint32(time.Since(user.LastActivityAt).Seconds()))), // Idle Time
|
||||
oscar.NewTLV(0x03, util.Dword(uint32(time.Now().Unix()))), // Client Signon Time
|
||||
|
|
|
@ -46,7 +46,7 @@ type channel struct {
|
|||
Unknown uint16
|
||||
}
|
||||
|
||||
func (icbm *ICBM) HandleSNAC(ctx context.Context, db *bun.DB, snac *oscar.SNAC) (context.Context, error) {
|
||||
func (icbm *ICBM) HandleSNAC(ctx context.Context, db *bun.DB, snac *oscar.SNAC, comm chan *models.Message) (context.Context, error) {
|
||||
session, _ := oscar.SessionFromContext(ctx)
|
||||
|
||||
switch snac.Header.Subtype {
|
||||
|
@ -173,10 +173,14 @@ func (icbm *ICBM) HandleSNAC(ctx context.Context, db *bun.DB, snac *oscar.SNAC)
|
|||
return ctx, errors.New("read insufficient data from message fragment")
|
||||
}
|
||||
|
||||
if err = models.InsertMessage(ctx, db, msgID, user.Username, to, string(messageContents)); err != nil {
|
||||
message, err := models.InsertMessage(ctx, db, msgID, user.Username, to, string(messageContents))
|
||||
if err != nil {
|
||||
return ctx, errors.Wrap(err, "could not insert message")
|
||||
}
|
||||
|
||||
// Fire the message off into the communication channel to get delivered
|
||||
comm <- message
|
||||
|
||||
// The Client usually wants a response that the server got the message. It checks that the message
|
||||
// back has the same message ID that was sent and the user it was sent to.
|
||||
ackTLV := oscar.FindTLV(tlvs, 3)
|
||||
|
|
|
@ -74,7 +74,7 @@ func (a *AuthorizationRegistrationService) GenerateCipher() string {
|
|||
return base32.StdEncoding.EncodeToString(randomBytes)[:CIPHER_LENGTH]
|
||||
}
|
||||
|
||||
func (a *AuthorizationRegistrationService) HandleSNAC(ctx context.Context, db *bun.DB, snac *oscar.SNAC) (context.Context, error) {
|
||||
func (a *AuthorizationRegistrationService) HandleSNAC(ctx context.Context, db *bun.DB, snac *oscar.SNAC, comm chan *models.Message) (context.Context, error) {
|
||||
session, err := oscar.SessionFromContext(ctx)
|
||||
if err != nil {
|
||||
util.PanicIfError(err)
|
||||
|
|
65
main.go
65
main.go
|
@ -31,8 +31,12 @@ const (
|
|||
|
||||
var services map[uint16]Service
|
||||
|
||||
// Map username to session
|
||||
var sessions map[string]*oscar.Session
|
||||
|
||||
func init() {
|
||||
services = make(map[uint16]Service)
|
||||
sessions = make(map[string]*oscar.Session)
|
||||
}
|
||||
|
||||
func RegisterService(family uint16, service Service) {
|
||||
|
@ -69,6 +73,63 @@ func main() {
|
|||
}
|
||||
defer listener.Close()
|
||||
|
||||
// Goroutine that listens for messages to deliver and tries to find a user socket to push them to
|
||||
commCh := make(chan *models.Message, 1)
|
||||
go func() {
|
||||
for {
|
||||
message, more := <-commCh
|
||||
if !more {
|
||||
log.Printf("message delivery routine shutting down")
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("got a message: %s", message)
|
||||
if s, ok := sessions[message.To]; ok {
|
||||
messageSnac := oscar.NewSNAC(4, 7)
|
||||
messageSnac.Data.WriteUint64(message.MessageID)
|
||||
messageSnac.Data.WriteUint16(1)
|
||||
messageSnac.Data.WriteLPString(message.From)
|
||||
messageSnac.Data.WriteUint16(0) // TODO: sender's warning level
|
||||
|
||||
tlvs := []*oscar.TLV{
|
||||
oscar.NewTLV(1, util.Word(0x80)), // TODO: user class
|
||||
oscar.NewTLV(6, util.Dword(0x0001|0x0100)), // TODO: user status
|
||||
oscar.NewTLV(0xf, util.Dword(0)), // TODO: user idle time
|
||||
oscar.NewTLV(3, util.Dword(0)), // TODO: user creation time
|
||||
// oscar.NewTLV(4, []byte{}), // TODO: this TLV appears in automated responses like away messages
|
||||
}
|
||||
|
||||
// Length of TLVs in fixed part
|
||||
messageSnac.Data.WriteUint16(uint16(len(tlvs)))
|
||||
|
||||
frag := oscar.Buffer{}
|
||||
frag.Write([]byte{5, 1, 0, 4, 1, 1, 1, 1}) // TODO: first fragment [id, version, len, len, (cap * len)... ]
|
||||
frag.Write([]byte{1, 1}) // message text fragment start (this is a busted "TLV")
|
||||
frag.Write(util.Word(uint16(len(message.Contents) + 4))) // length of TLV
|
||||
frag.Write([]byte{0, 0, 0, 0}) // TODO: message charset number, message charset subset
|
||||
frag.WriteString(message.Contents)
|
||||
|
||||
// Append the fragments
|
||||
messageSnac.Data.Write(frag.Bytes())
|
||||
|
||||
messageFlap := oscar.NewFLAP(2)
|
||||
messageFlap.Data.WriteBinary(messageSnac)
|
||||
if err := s.Send(messageFlap); err != nil {
|
||||
log.Panicf("could not deliver message %d: %s", message.MessageID, err.Error())
|
||||
continue
|
||||
} else {
|
||||
log.Printf("sent message %d to user %s at %s", message.MessageID, message.To, s.RemoteAddr())
|
||||
}
|
||||
|
||||
if err := message.MarkDelivered(context.Background(), db); err != nil {
|
||||
log.Panicf("could not mark message %d as delivered: %s", message.MessageID, err.Error())
|
||||
}
|
||||
} else {
|
||||
log.Printf("could not find session for user %s", message.To)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
handleCloseFn := func(ctx context.Context, session *oscar.Session) {
|
||||
log.Printf("%v disconnected", session.RemoteAddr())
|
||||
|
||||
|
@ -91,6 +152,7 @@ func main() {
|
|||
fmt.Printf("%s (%v) ->\n%+v\n", user.Username, session.RemoteAddr(), flap)
|
||||
user.LastActivityAt = time.Now()
|
||||
ctx = models.NewContextWithUser(ctx, user)
|
||||
sessions[user.Username] = session
|
||||
} else {
|
||||
fmt.Printf("%v ->\n%+v\n", session.RemoteAddr(), flap)
|
||||
}
|
||||
|
@ -132,7 +194,7 @@ func main() {
|
|||
}
|
||||
|
||||
if service, ok := services[snac.Header.Family]; ok {
|
||||
newCtx, err := service.HandleSNAC(ctx, db, snac)
|
||||
newCtx, err := service.HandleSNAC(ctx, db, snac, commCh)
|
||||
util.PanicIfError(err)
|
||||
return newCtx
|
||||
}
|
||||
|
@ -154,6 +216,7 @@ func main() {
|
|||
signal.Notify(exitChan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGABRT)
|
||||
go func() {
|
||||
<-exitChan
|
||||
close(commCh)
|
||||
fmt.Println("Shutting down")
|
||||
os.Exit(1)
|
||||
}()
|
||||
|
|
|
@ -2,6 +2,7 @@ package models
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
@ -18,15 +19,28 @@ type Message struct {
|
|||
DeliveredAt time.Time `bun:",nullzero"`
|
||||
}
|
||||
|
||||
func InsertMessage(ctx context.Context, db *bun.DB, messageId uint64, from string, to string, contents string) error {
|
||||
func InsertMessage(ctx context.Context, db *bun.DB, messageId uint64, from string, to string, contents string) (*Message, error) {
|
||||
msg := &Message{
|
||||
MessageID: messageId,
|
||||
From: from,
|
||||
To: to,
|
||||
Contents: contents,
|
||||
}
|
||||
if _, err := db.NewInsert().Model(msg).Exec(ctx); err != nil {
|
||||
return errors.Wrap(err, "could not update user")
|
||||
if _, err := db.NewInsert().Model(msg).Exec(ctx, msg); err != nil {
|
||||
return nil, errors.Wrap(err, "could not update user")
|
||||
}
|
||||
|
||||
return msg, nil
|
||||
}
|
||||
|
||||
func (m *Message) String() string {
|
||||
return fmt.Sprintf("<Message from=%s to=%s content=\"%s\">", m.From, m.To, m.Contents)
|
||||
}
|
||||
|
||||
func (m *Message) MarkDelivered(ctx context.Context, db *bun.DB) error {
|
||||
m.DeliveredAt = time.Now()
|
||||
if _, err := db.NewUpdate().Model(m).Where("message_id = ?", m.MessageID).Exec(ctx); err != nil {
|
||||
return errors.Wrap(err, "could not mark message as updated")
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"aim-oscar/models"
|
||||
"aim-oscar/oscar"
|
||||
"context"
|
||||
|
||||
|
@ -8,5 +9,5 @@ import (
|
|||
)
|
||||
|
||||
type Service interface {
|
||||
HandleSNAC(context.Context, *bun.DB, *oscar.SNAC) (context.Context, error)
|
||||
HandleSNAC(context.Context, *bun.DB, *oscar.SNAC, chan *models.Message) (context.Context, error)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue