mirror of
https://codeberg.org/forgejo/forgejo
synced 2024-11-24 18:56:11 +01:00
8eefe2af45
* Empty log queue on flush and close It is possible for log events to remain in the buffer off the multichannelledlog and thus not be logged despite close or flush. This PR simply adds a function to empty the queue before closing or flushing. (Except when the logger is paused.) Reference #19982 Signed-off-by: Andrew Thornton <art27@cantab.net> * and do similar for ChannelledLog Signed-off-by: Andrew Thornton <art27@cantab.net> Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
464 lines
10 KiB
Go
464 lines
10 KiB
Go
// Copyright 2019 The Gitea Authors. All rights reserved.
|
|
// Use of this source code is governed by a MIT-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package log
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"runtime/pprof"
|
|
"sync"
|
|
"time"
|
|
|
|
"code.gitea.io/gitea/modules/process"
|
|
)
|
|
|
|
// Event represents a logging event
|
|
type Event struct {
|
|
level Level
|
|
msg string
|
|
caller string
|
|
filename string
|
|
line int
|
|
time time.Time
|
|
stacktrace string
|
|
}
|
|
|
|
// EventLogger represents the behaviours of a logger
|
|
type EventLogger interface {
|
|
LogEvent(event *Event) error
|
|
Close()
|
|
Flush()
|
|
GetLevel() Level
|
|
GetStacktraceLevel() Level
|
|
GetName() string
|
|
ReleaseReopen() error
|
|
}
|
|
|
|
// ChannelledLog represents a cached channel to a LoggerProvider
|
|
type ChannelledLog struct {
|
|
ctx context.Context
|
|
finished context.CancelFunc
|
|
name string
|
|
provider string
|
|
queue chan *Event
|
|
loggerProvider LoggerProvider
|
|
flush chan bool
|
|
close chan bool
|
|
closed chan bool
|
|
}
|
|
|
|
// NewChannelledLog a new logger instance with given logger provider and config.
|
|
func NewChannelledLog(parent context.Context, name, provider, config string, bufferLength int64) (*ChannelledLog, error) {
|
|
if log, ok := providers[provider]; ok {
|
|
|
|
l := &ChannelledLog{
|
|
queue: make(chan *Event, bufferLength),
|
|
flush: make(chan bool),
|
|
close: make(chan bool),
|
|
closed: make(chan bool),
|
|
}
|
|
l.loggerProvider = log()
|
|
if err := l.loggerProvider.Init(config); err != nil {
|
|
return nil, err
|
|
}
|
|
l.name = name
|
|
l.provider = provider
|
|
l.ctx, _, l.finished = process.GetManager().AddTypedContext(parent, fmt.Sprintf("Logger: %s(%s)", l.name, l.provider), process.SystemProcessType, false)
|
|
go l.Start()
|
|
return l, nil
|
|
}
|
|
return nil, ErrUnknownProvider{provider}
|
|
}
|
|
|
|
// Start processing the ChannelledLog
|
|
func (l *ChannelledLog) Start() {
|
|
pprof.SetGoroutineLabels(l.ctx)
|
|
defer l.finished()
|
|
for {
|
|
select {
|
|
case event, ok := <-l.queue:
|
|
if !ok {
|
|
l.closeLogger()
|
|
return
|
|
}
|
|
l.loggerProvider.LogEvent(event)
|
|
case _, ok := <-l.flush:
|
|
if !ok {
|
|
l.closeLogger()
|
|
return
|
|
}
|
|
l.emptyQueue()
|
|
l.loggerProvider.Flush()
|
|
case <-l.close:
|
|
l.emptyQueue()
|
|
l.closeLogger()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// LogEvent logs an event to this ChannelledLog
|
|
func (l *ChannelledLog) LogEvent(event *Event) error {
|
|
select {
|
|
case l.queue <- event:
|
|
return nil
|
|
case <-time.After(60 * time.Second):
|
|
// We're blocked!
|
|
return ErrTimeout{
|
|
Name: l.name,
|
|
Provider: l.provider,
|
|
}
|
|
}
|
|
}
|
|
|
|
func (l *ChannelledLog) emptyQueue() bool {
|
|
for {
|
|
select {
|
|
case event, ok := <-l.queue:
|
|
if !ok {
|
|
return false
|
|
}
|
|
l.loggerProvider.LogEvent(event)
|
|
default:
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
|
|
func (l *ChannelledLog) closeLogger() {
|
|
l.loggerProvider.Flush()
|
|
l.loggerProvider.Close()
|
|
l.closed <- true
|
|
}
|
|
|
|
// Close this ChannelledLog
|
|
func (l *ChannelledLog) Close() {
|
|
l.close <- true
|
|
<-l.closed
|
|
}
|
|
|
|
// Flush this ChannelledLog
|
|
func (l *ChannelledLog) Flush() {
|
|
l.flush <- true
|
|
}
|
|
|
|
// ReleaseReopen this ChannelledLog
|
|
func (l *ChannelledLog) ReleaseReopen() error {
|
|
return l.loggerProvider.ReleaseReopen()
|
|
}
|
|
|
|
// GetLevel gets the level of this ChannelledLog
|
|
func (l *ChannelledLog) GetLevel() Level {
|
|
return l.loggerProvider.GetLevel()
|
|
}
|
|
|
|
// GetStacktraceLevel gets the level of this ChannelledLog
|
|
func (l *ChannelledLog) GetStacktraceLevel() Level {
|
|
return l.loggerProvider.GetStacktraceLevel()
|
|
}
|
|
|
|
// GetName returns the name of this ChannelledLog
|
|
func (l *ChannelledLog) GetName() string {
|
|
return l.name
|
|
}
|
|
|
|
// MultiChannelledLog represents a cached channel to a LoggerProvider
|
|
type MultiChannelledLog struct {
|
|
ctx context.Context
|
|
finished context.CancelFunc
|
|
name string
|
|
bufferLength int64
|
|
queue chan *Event
|
|
rwmutex sync.RWMutex
|
|
loggers map[string]EventLogger
|
|
flush chan bool
|
|
close chan bool
|
|
started bool
|
|
level Level
|
|
stacktraceLevel Level
|
|
closed chan bool
|
|
paused chan bool
|
|
}
|
|
|
|
// NewMultiChannelledLog a new logger instance with given logger provider and config.
|
|
func NewMultiChannelledLog(name string, bufferLength int64) *MultiChannelledLog {
|
|
ctx, _, finished := process.GetManager().AddTypedContext(context.Background(), fmt.Sprintf("Logger: %s", name), process.SystemProcessType, false)
|
|
|
|
m := &MultiChannelledLog{
|
|
ctx: ctx,
|
|
finished: finished,
|
|
name: name,
|
|
queue: make(chan *Event, bufferLength),
|
|
flush: make(chan bool),
|
|
bufferLength: bufferLength,
|
|
loggers: make(map[string]EventLogger),
|
|
level: NONE,
|
|
stacktraceLevel: NONE,
|
|
close: make(chan bool),
|
|
closed: make(chan bool),
|
|
paused: make(chan bool),
|
|
}
|
|
return m
|
|
}
|
|
|
|
// AddLogger adds a logger to this MultiChannelledLog
|
|
func (m *MultiChannelledLog) AddLogger(logger EventLogger) error {
|
|
m.rwmutex.Lock()
|
|
name := logger.GetName()
|
|
if _, has := m.loggers[name]; has {
|
|
m.rwmutex.Unlock()
|
|
return ErrDuplicateName{name}
|
|
}
|
|
m.loggers[name] = logger
|
|
if logger.GetLevel() < m.level {
|
|
m.level = logger.GetLevel()
|
|
}
|
|
if logger.GetStacktraceLevel() < m.stacktraceLevel {
|
|
m.stacktraceLevel = logger.GetStacktraceLevel()
|
|
}
|
|
m.rwmutex.Unlock()
|
|
go m.Start()
|
|
return nil
|
|
}
|
|
|
|
// DelLogger removes a sub logger from this MultiChannelledLog
|
|
// NB: If you delete the last sublogger this logger will simply drop
|
|
// log events
|
|
func (m *MultiChannelledLog) DelLogger(name string) bool {
|
|
m.rwmutex.Lock()
|
|
logger, has := m.loggers[name]
|
|
if !has {
|
|
m.rwmutex.Unlock()
|
|
return false
|
|
}
|
|
delete(m.loggers, name)
|
|
m.internalResetLevel()
|
|
m.rwmutex.Unlock()
|
|
logger.Flush()
|
|
logger.Close()
|
|
return true
|
|
}
|
|
|
|
// GetEventLogger returns a sub logger from this MultiChannelledLog
|
|
func (m *MultiChannelledLog) GetEventLogger(name string) EventLogger {
|
|
m.rwmutex.RLock()
|
|
defer m.rwmutex.RUnlock()
|
|
return m.loggers[name]
|
|
}
|
|
|
|
// GetEventProvider returns a sub logger provider content from this MultiChannelledLog
|
|
func (m *MultiChannelledLog) GetLoggerProviderContent(name string) (string, error) {
|
|
channelledLogger := m.GetEventLogger(name).(*ChannelledLog)
|
|
return channelledLogger.loggerProvider.Content()
|
|
}
|
|
|
|
// GetEventLoggerNames returns a list of names
|
|
func (m *MultiChannelledLog) GetEventLoggerNames() []string {
|
|
m.rwmutex.RLock()
|
|
defer m.rwmutex.RUnlock()
|
|
var keys []string
|
|
for k := range m.loggers {
|
|
keys = append(keys, k)
|
|
}
|
|
return keys
|
|
}
|
|
|
|
func (m *MultiChannelledLog) closeLoggers() {
|
|
m.rwmutex.Lock()
|
|
for _, logger := range m.loggers {
|
|
logger.Flush()
|
|
logger.Close()
|
|
}
|
|
m.rwmutex.Unlock()
|
|
m.closed <- true
|
|
}
|
|
|
|
// Pause pauses this Logger
|
|
func (m *MultiChannelledLog) Pause() {
|
|
m.paused <- true
|
|
}
|
|
|
|
// Resume resumes this Logger
|
|
func (m *MultiChannelledLog) Resume() {
|
|
m.paused <- false
|
|
}
|
|
|
|
// ReleaseReopen causes this logger to tell its subloggers to release and reopen
|
|
func (m *MultiChannelledLog) ReleaseReopen() error {
|
|
m.rwmutex.Lock()
|
|
defer m.rwmutex.Unlock()
|
|
var accumulatedErr error
|
|
for _, logger := range m.loggers {
|
|
if err := logger.ReleaseReopen(); err != nil {
|
|
if accumulatedErr == nil {
|
|
accumulatedErr = fmt.Errorf("Error whilst reopening: %s Error: %v", logger.GetName(), err)
|
|
} else {
|
|
accumulatedErr = fmt.Errorf("Error whilst reopening: %s Error: %v & %v", logger.GetName(), err, accumulatedErr)
|
|
}
|
|
}
|
|
}
|
|
return accumulatedErr
|
|
}
|
|
|
|
// Start processing the MultiChannelledLog
|
|
func (m *MultiChannelledLog) Start() {
|
|
m.rwmutex.Lock()
|
|
if m.started {
|
|
m.rwmutex.Unlock()
|
|
return
|
|
}
|
|
pprof.SetGoroutineLabels(m.ctx)
|
|
defer m.finished()
|
|
|
|
m.started = true
|
|
m.rwmutex.Unlock()
|
|
paused := false
|
|
for {
|
|
if paused {
|
|
select {
|
|
case paused = <-m.paused:
|
|
if !paused {
|
|
m.ResetLevel()
|
|
}
|
|
case _, ok := <-m.flush:
|
|
if !ok {
|
|
m.closeLoggers()
|
|
return
|
|
}
|
|
m.rwmutex.RLock()
|
|
for _, logger := range m.loggers {
|
|
logger.Flush()
|
|
}
|
|
m.rwmutex.RUnlock()
|
|
case <-m.close:
|
|
m.closeLoggers()
|
|
return
|
|
}
|
|
continue
|
|
}
|
|
select {
|
|
case paused = <-m.paused:
|
|
if paused && m.level < INFO {
|
|
m.level = INFO
|
|
}
|
|
case event, ok := <-m.queue:
|
|
if !ok {
|
|
m.closeLoggers()
|
|
return
|
|
}
|
|
m.rwmutex.RLock()
|
|
for _, logger := range m.loggers {
|
|
err := logger.LogEvent(event)
|
|
if err != nil {
|
|
fmt.Println(err)
|
|
}
|
|
}
|
|
m.rwmutex.RUnlock()
|
|
case _, ok := <-m.flush:
|
|
if !ok {
|
|
m.closeLoggers()
|
|
return
|
|
}
|
|
m.emptyQueue()
|
|
m.rwmutex.RLock()
|
|
for _, logger := range m.loggers {
|
|
logger.Flush()
|
|
}
|
|
m.rwmutex.RUnlock()
|
|
case <-m.close:
|
|
m.emptyQueue()
|
|
m.closeLoggers()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *MultiChannelledLog) emptyQueue() bool {
|
|
for {
|
|
select {
|
|
case event, ok := <-m.queue:
|
|
if !ok {
|
|
return false
|
|
}
|
|
m.rwmutex.RLock()
|
|
for _, logger := range m.loggers {
|
|
err := logger.LogEvent(event)
|
|
if err != nil {
|
|
fmt.Println(err)
|
|
}
|
|
}
|
|
m.rwmutex.RUnlock()
|
|
default:
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
|
|
// LogEvent logs an event to this MultiChannelledLog
|
|
func (m *MultiChannelledLog) LogEvent(event *Event) error {
|
|
select {
|
|
case m.queue <- event:
|
|
return nil
|
|
case <-time.After(100 * time.Millisecond):
|
|
// We're blocked!
|
|
return ErrTimeout{
|
|
Name: m.name,
|
|
Provider: "MultiChannelledLog",
|
|
}
|
|
}
|
|
}
|
|
|
|
// Close this MultiChannelledLog
|
|
func (m *MultiChannelledLog) Close() {
|
|
m.close <- true
|
|
<-m.closed
|
|
}
|
|
|
|
// Flush this ChannelledLog
|
|
func (m *MultiChannelledLog) Flush() {
|
|
m.flush <- true
|
|
}
|
|
|
|
// GetLevel gets the level of this MultiChannelledLog
|
|
func (m *MultiChannelledLog) GetLevel() Level {
|
|
m.rwmutex.RLock()
|
|
defer m.rwmutex.RUnlock()
|
|
return m.level
|
|
}
|
|
|
|
// GetStacktraceLevel gets the level of this MultiChannelledLog
|
|
func (m *MultiChannelledLog) GetStacktraceLevel() Level {
|
|
m.rwmutex.RLock()
|
|
defer m.rwmutex.RUnlock()
|
|
return m.stacktraceLevel
|
|
}
|
|
|
|
func (m *MultiChannelledLog) internalResetLevel() Level {
|
|
m.level = NONE
|
|
for _, logger := range m.loggers {
|
|
level := logger.GetLevel()
|
|
if level < m.level {
|
|
m.level = level
|
|
}
|
|
level = logger.GetStacktraceLevel()
|
|
if level < m.stacktraceLevel {
|
|
m.stacktraceLevel = level
|
|
}
|
|
}
|
|
return m.level
|
|
}
|
|
|
|
// ResetLevel will reset the level of this MultiChannelledLog
|
|
func (m *MultiChannelledLog) ResetLevel() Level {
|
|
m.rwmutex.Lock()
|
|
defer m.rwmutex.Unlock()
|
|
return m.internalResetLevel()
|
|
}
|
|
|
|
// GetName gets the name of this MultiChannelledLog
|
|
func (m *MultiChannelledLog) GetName() string {
|
|
return m.name
|
|
}
|