Skip to content
Snippets Groups Projects
Verified Commit 0effaa91 authored by Louis's avatar Louis :fire:
Browse files

Merge branch 'feature/CLS' into develop

parents 3b390721 50d9fdef
No related branches found
No related tags found
No related merge requests found
......@@ -7,7 +7,7 @@ services:
volumes:
- ./.dck/redis:/data
labels:
uk.co.hackerfest.environment: 'staging'
tech.jetsam.environment: 'staging'
amqp:
image: 'rabbitmq:3-management'
hostname: jetsam_rabbit
......@@ -17,4 +17,4 @@ services:
volumes:
- ./.dck/rabbit:/data
labels:
uk.co.hackerfest.environment: 'staging'
\ No newline at end of file
tech.jetsam.environment: 'staging'
\ No newline at end of file
......@@ -6,6 +6,7 @@ const app = require('app')
const http = require('http')
const { config, env, fs, boot } = require('bootstrap')
const debug = require('debug')('server:boot')
const pkg = require('./package.json')
const Sentry = require('@sentry/node')
const Tracing = require("@sentry/tracing");
......@@ -19,6 +20,7 @@ function bindSentry(app) {
integrations: integrations => integrations.filter(itg => itg.name !== 'Console'),
environment: config('app.env'),
tracesSampleRate: config('sentry.samples'),
release: `${ pkg.name }@${ pkg.version }`
})
debug('Binding sentry to app level errors')
......
......@@ -13,6 +13,7 @@ if (url) {
username: connectionUrl.username,
password: connectionUrl.password,
port: connectionUrl.port,
log_queries: env('LOG_SQL_QUERIES', 'true') === 'true',
}
} else {
module.exports = {
......@@ -28,6 +29,7 @@ if (url) {
ca: Buffer.from(env('DATABASE_CA_CERT', null) ?? '', 'base64').toString(),
}
},
log_queries: env('LOG_SQL_QUERIES', 'true') === 'true',
}
}
......
function createServiceWithProfiling(service, ctx) {
const toReplace = new Set(service.profileMethods ?? [])
const svc = service.withContext(ctx)
......@@ -15,10 +14,38 @@ function createServiceWithProfiling(service, ctx) {
}
}
return Reflect.get(target, p, receiver)
}
},
})
}
function createFakeContext() {
const baseCtx = require('koa/lib/context')
const baseRes = require('koa/lib/response')
const baseReq = require('koa/lib/request')
const http = require('http')
const req = new http.IncomingMessage()
req.method = 'INTERNAL'
req.path = '_internal_'
const res = new http.ServerResponse(req)
res.statusCode = 200
const context = Object.create(baseCtx)
const request = context.request = Object.create(baseReq)
const response = context.response = Object.create(baseRes)
context.app = request.app = response.app = this
context.req = request.req = response.req = req
context.res = request.res = response.res = res
request.ctx = response.ctx = context
request.response = response
response.request = request
context.originalUrl = request.originalUrl = req.url
context.state = {}
return context
}
module.exports = class ServiceProvider {
static async attach(ctx, next) {
const services = [
......@@ -36,7 +63,7 @@ module.exports = class ServiceProvider {
services.forEach(service => {
const name = service.getServiceName()
if (ctx.services.hasOwnProperty(name)) {
console.warn(`Multiple services found for name: ${ name }. Using implementation provided by ${ service.name }`)
console.warn(`Multiple services found for name: ${name}. Using implementation provided by ${service.name}`)
}
ctx.services[name] = createServiceWithProfiling(service, ctx)
})
......@@ -53,7 +80,12 @@ module.exports = class ServiceProvider {
}
static /* async */ detached() {
const newContext = {}
const threadContext = require('core/injection/ThreadContext')
const newContext = createFakeContext()
newContext.profile = (...args) => {
return threadContext.profile(...args)
}
return ServiceProvider.attach(newContext, () => newContext)
}
}
\ No newline at end of file
const { AsyncLocalStorage } = require('async_hooks')
const sym = Symbol.for('jetsam_api_thread_context')
class ThreadContext extends AsyncLocalStorage {
run(cb) {
return super.run(new Map(), cb)
}
_if(cb) {
const store = this.getStore()
if (store) {
return cb(store)
}
return null
}
get(key) {
return this._if(store => store.get(key))
}
set(key, value) {
return this._if(store => store.set(key, value))
}
delete(key) {
return this._if(store => store.delete(key))
}
async profile(action, description, cb) {
const store = this.getStore()
if (store) {
const t = this.get('profiling')
if (t) {
const span = t.startChild({
description,
op: action,
})
try {
return await cb(span)
} finally {
span.finish()
}
}
}
return await cb(null)
}
getTransaction(ctx) {
const existing = this.get('profiling')
if (existing) {
return existing
}
const Sentry = require("@sentry/node")
const t = Sentry.startTransaction(ctx)
this.set('profiling', t)
return t
}
stopTransaction() {
const t = this.get('profiling')
if (t) {
t.finish()
this.delete('profiling')
}
return t
}
}
if (!global[sym]) {
global[sym] = new ThreadContext()
}
module.exports = global[sym]
const HANDLERS = [
// ['send-reset-email', require('domain/auth/jobs/SendPasswordResetEmail')]
['send-user-password-reset', require('domain/auth/handlers/SendUserPasswordReset')]
]
module.exports = function bindJobHandlers() {
......
......@@ -2,6 +2,8 @@ const { fs, config, env } = require('bootstrap')
const path = require('path')
const Sequelize = require('sequelize')
const { DataTypes } = Sequelize
const threadContext = require('core/injection/ThreadContext')
const { v4: uuid } = require('uuid')
if (env('CLI', false)) {
require('dotenv').config()
......@@ -39,6 +41,31 @@ Object.keys(db).forEach(modelName => {
}
})
if (config('database.log_queries')) {
sequelize.addHook('beforeQuery', (model, query) => {
const trace = threadContext.get('profiling')
if (trace) {
const traceId = uuid()
const span = trace.startChild({
op: 'sql.query',
})
threadContext.set(`span_${ traceId }`, span)
query.options.trace = traceId
}
})
sequelize.addHook('afterQuery', (model, query) => {
if (query.options.trace) {
const trace = threadContext.get(`span_${ query.options.trace }`)
if (trace) {
trace.description = query.sql
trace.setData('sql.params', query.options.bind)
trace.finish()
}
}
})
}
db.sequelize = sequelize
db.Sequelize = Sequelize
......
......@@ -54,7 +54,7 @@ module.exports = class AuthenticationService extends ContextualModule {
return this._user
}
} else if (this.ctx.get('Authorization')) {
const token = this.ctx.get('Authorization').substr(BEARER_PREFIX.length)
const token = this.ctx.get('Authorization').substr(HEADER_PREFIX.length)
const accessToken = await AccessToken.findOne({
where: { token },
include: [{ model: User }]
......
const { config } = require('bootstrap')
const HttpError = require('core/errors/HttpError')
module.exports = async (body, ctx) => {
const { email } = body
const user = await ctx.services['core.users'].findByEmail(email)
const token = await ctx.profile('user.generateResetToken', 'Create reset token', () => user.generateResetToken())
const name = user.name || 'Jetsam User (You haven\'t told us your name!)'
const reset_link = new URL(`/reset-password?token=${ token }`, config('app.host.web'))
const { mail } = require('services')
try {
await ctx.profile(
'services.mail.sendTemplate',
`template ${ config('mail.templates.reset-password')}`,
() => mail.sendTemplate(email, 'Reset Your Jetsam password', config('mail.templates.reset-password'), {
name,
reset_link,
})
)
} catch (e) {
// reporter.report(e)
console.log(e.response.body.errors)
throw new HttpError(
500,
'Failed To Send Reset Email',
{ status: 500, title: 'Failed to send reset email', description: 'Could not send the password reset email' },
{
sendgrid: (e.response?.body?.errors ?? []).reduce((acc, e, i) => ({
...acc,
[`err-${ i }`]: JSON.stringify(e, null, 2),
}), {}),
}
)
}
}
\ No newline at end of file
......@@ -36,43 +36,46 @@ exports.login = async ctx => {
}
exports.triggerPasswordReset = async ctx => {
const { queue } = require('services')
const { email } = ctx.request.body
const user = await ctx.services['core.users'].findByEmail(email)
if (!user) {
throw new HttpError(404, 'No Such Email', { status: 404, title: 'No Such Email', description: 'The provided email address is not associated with an account' })
}
const token = await ctx.profile('user.generateResetToken', 'Create reset token', () => user.generateResetToken())
const name = user.name || 'Jetsam User (You haven\'t told us your name!)'
const reset_link = new URL(`/reset-password?token=${ token }`, config('app.host.web'))
const { mail } = require('services')
try {
await ctx.profile(
'services.mail.sendTemplate',
`template ${ config('mail.templates.reset-password')}`,
() => mail.sendTemplate(email, 'Reset Your Jetsam password', config('mail.templates.reset-password'), {
name,
reset_link,
})
)
} catch (e) {
// reporter.report(e)
console.log(e.response.body.errors)
throw new HttpError(
500,
'Failed To Send Reset Email',
{ status: 500, title: 'Failed to send reset email', description: 'Could not send the password reset email' },
{
sendgrid: (e.response?.body?.errors ?? []).reduce((acc, e, i) => ({
...acc,
[`err-${ i }`]: JSON.stringify(e, null, 2),
}), {}),
}
)
}
await queue.dispatch('send-user-password-reset', { email })
// const token = await ctx.profile('user.generateResetToken', 'Create reset token', () => user.generateResetToken())
//
// const name = user.name || 'Jetsam User (You haven\'t told us your name!)'
// const reset_link = new URL(`/reset-password?token=${ token }`, config('app.host.web'))
//
// const { mail } = require('services')
//
// try {
// await ctx.profile(
// 'services.mail.sendTemplate',
// `template ${ config('mail.templates.reset-password')}`,
// () => mail.sendTemplate(email, 'Reset Your Jetsam password', config('mail.templates.reset-password'), {
// name,
// reset_link,
// })
// )
// } catch (e) {
// // reporter.report(e)
// console.log(e.response.body.errors)
// throw new HttpError(
// 500,
// 'Failed To Send Reset Email',
// { status: 500, title: 'Failed to send reset email', description: 'Could not send the password reset email' },
// {
// sendgrid: (e.response?.body?.errors ?? []).reduce((acc, e, i) => ({
// ...acc,
// [`err-${ i }`]: JSON.stringify(e, null, 2),
// }), {}),
// }
// )
// }
ctx.body = {
reset_token: null,
......
const Sentry = require("@sentry/node")
const { extractTraceparentData }= require("@sentry/tracing")
const threadContext = require('core/injection/ThreadContext')
module.exports = async (ctx, next) => {
let traceparentData
......@@ -7,7 +8,7 @@ module.exports = async (ctx, next) => {
traceparentData = extractTraceparentData(ctx.request.get("sentry-trace"));
}
const t = Sentry.startTransaction({
const t = threadContext.getTransaction({
op: 'http.request',
name: `[${ ctx.method }] ${ ctx.path }`,
traceparentData,
......@@ -22,6 +23,7 @@ module.exports = async (ctx, next) => {
t.setData('http.query', ctx.request.query)
ctx._profiling = t
threadContext.set('profiling', t)
ctx.profile = async (action, description, cb) => {
const span = t.startChild({
......@@ -41,6 +43,6 @@ module.exports = async (ctx, next) => {
} finally {
t.setName(`[${ ctx.method }] ${ ctx._matchedRouteName ?? ctx._matchedRoute ?? ctx.path }`)
t.setHttpStatus(ctx.status)
t.finish()
threadContext.stopTransaction()
}
}
\ No newline at end of file
const tc = require('core/injection/ThreadContext')
const { v4: uuid } = require('uuid')
module.exports = async (ctx, next) => {
return tc.run(async () => {
tc.set('rid', uuid())
await next()
ctx.response.set('x-jtrace', tc.get('rid'))
})
}
......@@ -6,6 +6,7 @@ const Router = require('@koa/router')
const multer = require('@koa/multer')
const upload = multer({ dest: '/tmp/' })
const context = require('http/middleware/ThreadContextWrapper')
const errors = require('http/middleware/ErrorHandler')
const includes = require('http/middleware/ParseIncludes')
const profiling = require('http/middleware/Profiler')
......@@ -45,6 +46,7 @@ const apiRouter = new Router({ prefix: '/api' })
const apiLegacy = new Router({ prefix: '/api/api' })
function mount(api) {
api.use(context)
api.use(profiling)
api.use(errors)
api.use(includes)
......
......@@ -4,6 +4,7 @@ const { config, env } = require('bootstrap')
const amqplib = require('amqplib/channel_api')
const debug = require('debug')('server:services:queue')
const Sentry = require('@sentry/node')
const threadContext = require('core/injection/ThreadContext')
class AmqpQueue extends Queue {
constructor() {
......@@ -20,9 +21,9 @@ class AmqpQueue extends Queue {
this._init = amqplib.connect(params)
this._amqp = null
this._channel = null
this._exchange = 'hackerfest.jobs.distributor'
this._queue = 'hackerfest.jobs.worker'
this.dlq = 'hackerfest.jobs.dlq'
this._exchange = 'jetsam.jobs.distributor'
this._queue = 'jetsam.jobs.worker'
this.dlq = 'jetsam.jobs.dlq'
this._handlers = {}
this._init.then(q => {
......@@ -38,7 +39,7 @@ class AmqpQueue extends Queue {
this._handlers[jobname] = handler
}
dispatch(jobname, payload, attempt = 0) {
return this.dispatchAfter(jobname, payload, 0, attempt)
return threadContext.profile('queue.dispatch', jobname, () => this.dispatchAfter(jobname, payload, 0, attempt))
}
dispatchAfter(jobname, payload, delay, attempt = 0) {
return this._initialise().then(() => {
......@@ -58,10 +59,22 @@ class AmqpQueue extends Queue {
this._initialise().then(() => {
debug('Starting to process messages on queue')
this._channel.consume(this._queue, async object => {
if (!object) {
return
}
await this._handleMessage(object)
return threadContext.run(async () => {
if (!object) {
return
}
try {
debug('Starting Trace')
threadContext.getTransaction({
op: 'queue.job',
name: 'AMQP Queue Handler'
})
await this._handleMessage(object)
} finally {
debug('Ending Trace')
threadContext.stopTransaction()
}
})
})
})
......@@ -71,18 +84,33 @@ class AmqpQueue extends Queue {
}
async _handleMessage(object) {
const ServiceProvider = require('core/injection/ServiceProvider')
const content = JSON.parse(object.content.toString())
const { type, payload: body, delay, attempt } = content
const t = threadContext.getTransaction({
op: 'queue.job',
})
t.setTag('job.type', type)
t.setData('job.attempt', attempt)
t.setData('job.body', body)
if (!this._handlers.hasOwnProperty(type)) {
t.setName(`[Q] Unknown`)
debug(`No handler for type ${ type }, discarding message`)
await this._channel.ack(object)
return
}
t.setName(`[Q] ${ type }`)
debug(`Processing message ${ type }`)
try {
await this._handlers[type](body)
const ctx = await ServiceProvider.detached()
const handler = this._handlers[type]
t.description = handler.name
await threadContext.profile('job.handler', undefined, () => handler(body, ctx))
await this._channel.ack(object)
debug(`Processed message ${ type }`)
} catch (e) {
......@@ -91,6 +119,7 @@ class AmqpQueue extends Queue {
Sentry.captureException(e)
})
debug(`Failed message ${ type }`)
debug(e)
if (attempt < 5) {
const next = attempt + 1
debug(`Re-queue message ${ type } #${ next }`)
......
process.title = 'Jetsam Queue Worker'
const { config, boot } = require('bootstrap')
const debug = require('debug')('server:worker:boot')
const Sentry = require('@sentry/node')
const bindQueue = require('core/utils/queue')
const pkg = require('./package.json')
let close = null
let done = false
async function main() {
await boot()
bindSentry()
const { queue } = require('services')
if (queue._initialise) {
await queue._initialise()
}
bindQueue()
close = await queue.listen()
await new Promise(async r => {
debug('Starting worker spin loop')
while (!done) { await new Promise(rr => setTimeout(rr, 10)) }
debug('Ending worker spin loop')
r(true)
})
}
function bindSentry() {
Sentry.init({
dsn: config('sentry.dsn'),
integrations: integrations => integrations.filter(itg => itg.name !== 'Console'),
environment: config('app.env'),
tracesSampleRate: config('sentry.samples'),
release: `${ pkg.name }@${ pkg.version }`
})
debug('Binding sentry to process level errors')
process.on("error", (err) => {
Sentry.captureException(err);
});
}
main()
.catch(e => {
console.error(e)
Sentry.captureException(e);
process.exit(1)
})
const cleanupsigs = [
'SIGINT',
'SIGTERM',
'SIGUSR2',
]
cleanupsigs.forEach(signal => {
process.on(signal, () => {
if (close) {
close()
}
done = true
Sentry.close(2000).then(() => {
process.exit(0)
})
})
})
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment