11import http from "node:http" ;
22import { URL } from "node:url" ;
3+ import {
4+ createWebhookInFlightLimiter ,
5+ WEBHOOK_BODY_READ_DEFAULTS ,
6+ } from "openclaw/plugin-sdk/webhook-ingress" ;
37import {
48 isRequestBodyLimitError ,
59 readRequestBodyWithLimit ,
610 requestBodyErrorToText ,
711} from "../api.js" ;
812import { normalizeVoiceCallConfig , type VoiceCallConfig } from "./config.js" ;
913import type { CoreAgentDeps , CoreConfig } from "./core-bridge.js" ;
14+ import { getHeader } from "./http-headers.js" ;
1015import type { CallManager } from "./manager.js" ;
1116import type { MediaStreamConfig } from "./media-stream.js" ;
1217import { MediaStreamHandler } from "./media-stream.js" ;
@@ -16,10 +21,18 @@ import type { TwilioProvider } from "./providers/twilio.js";
1621import type { CallRecord , NormalizedEvent , WebhookContext } from "./types.js" ;
1722import { startStaleCallReaper } from "./webhook/stale-call-reaper.js" ;
1823
19- const MAX_WEBHOOK_BODY_BYTES = 1024 * 1024 ;
24+ const MAX_WEBHOOK_BODY_BYTES = WEBHOOK_BODY_READ_DEFAULTS . preAuth . maxBytes ;
25+ const WEBHOOK_BODY_TIMEOUT_MS = WEBHOOK_BODY_READ_DEFAULTS . preAuth . timeoutMs ;
2026const STREAM_DISCONNECT_HANGUP_GRACE_MS = 2000 ;
2127const TRANSCRIPT_LOG_MAX_CHARS = 200 ;
2228
29+ type WebhookHeaderGateResult =
30+ | { ok : true }
31+ | {
32+ ok : false ;
33+ reason : string ;
34+ } ;
35+
2336function sanitizeTranscriptForLog ( value : string ) : string {
2437 const sanitized = value
2538 . replace ( / [ \u0000 - \u001f \u007f ] / g, " " )
@@ -70,6 +83,7 @@ export class VoiceCallWebhookServer {
7083 private coreConfig : CoreConfig | null ;
7184 private agentRuntime : CoreAgentDeps | null ;
7285 private stopStaleCallReaper : ( ( ) => void ) | null = null ;
86+ private readonly webhookInFlightLimiter = createWebhookInFlightLimiter ( ) ;
7387
7488 /** Media stream handler for bidirectional audio (when streaming enabled) */
7589 private mediaStreamHandler : MediaStreamHandler | null = null ;
@@ -350,6 +364,7 @@ export class VoiceCallWebhookServer {
350364 clearTimeout ( timer ) ;
351365 }
352366 this . pendingDisconnectHangups . clear ( ) ;
367+ this . webhookInFlightLimiter . clear ( ) ;
353368
354369 if ( this . stopStaleCallReaper ) {
355370 this . stopStaleCallReaper ( ) ;
@@ -444,49 +459,100 @@ export class VoiceCallWebhookServer {
444459 return { statusCode : 405 , body : "Method Not Allowed" } ;
445460 }
446461
447- let body = "" ;
462+ const headerGate = this . verifyPreAuthWebhookHeaders ( req . headers ) ;
463+ if ( ! headerGate . ok ) {
464+ console . warn ( `[voice-call] Webhook rejected before body read: ${ headerGate . reason } ` ) ;
465+ return { statusCode : 401 , body : "Unauthorized" } ;
466+ }
467+
468+ const inFlightKey = req . socket . remoteAddress ?? "" ;
469+ if ( ! this . webhookInFlightLimiter . tryAcquire ( inFlightKey ) ) {
470+ console . warn ( `[voice-call] Webhook rejected before body read: too many in-flight requests` ) ;
471+ return { statusCode : 429 , body : "Too Many Requests" } ;
472+ }
473+
448474 try {
449- body = await this . readBody ( req , MAX_WEBHOOK_BODY_BYTES ) ;
450- } catch ( err ) {
451- if ( isRequestBodyLimitError ( err , "PAYLOAD_TOO_LARGE" ) ) {
452- return { statusCode : 413 , body : "Payload Too Large" } ;
453- }
454- if ( isRequestBodyLimitError ( err , "REQUEST_BODY_TIMEOUT" ) ) {
455- return { statusCode : 408 , body : requestBodyErrorToText ( "REQUEST_BODY_TIMEOUT" ) } ;
475+ let body = "" ;
476+ try {
477+ body = await this . readBody ( req , MAX_WEBHOOK_BODY_BYTES , WEBHOOK_BODY_TIMEOUT_MS ) ;
478+ } catch ( err ) {
479+ if ( isRequestBodyLimitError ( err , "PAYLOAD_TOO_LARGE" ) ) {
480+ return { statusCode : 413 , body : "Payload Too Large" } ;
481+ }
482+ if ( isRequestBodyLimitError ( err , "REQUEST_BODY_TIMEOUT" ) ) {
483+ return { statusCode : 408 , body : requestBodyErrorToText ( "REQUEST_BODY_TIMEOUT" ) } ;
484+ }
485+ throw err ;
456486 }
457- throw err ;
458- }
459487
460- const ctx : WebhookContext = {
461- headers : req . headers as Record < string , string | string [ ] | undefined > ,
462- rawBody : body ,
463- url : url . toString ( ) ,
464- method : "POST" ,
465- query : Object . fromEntries ( url . searchParams ) ,
466- remoteAddress : req . socket . remoteAddress ?? undefined ,
467- } ;
488+ const ctx : WebhookContext = {
489+ headers : req . headers as Record < string , string | string [ ] | undefined > ,
490+ rawBody : body ,
491+ url : url . toString ( ) ,
492+ method : "POST" ,
493+ query : Object . fromEntries ( url . searchParams ) ,
494+ remoteAddress : req . socket . remoteAddress ?? undefined ,
495+ } ;
468496
469- const verification = this . provider . verifyWebhook ( ctx ) ;
470- if ( ! verification . ok ) {
471- console . warn ( `[voice-call] Webhook verification failed: ${ verification . reason } ` ) ;
472- return { statusCode : 401 , body : "Unauthorized" } ;
473- }
474- if ( ! verification . verifiedRequestKey ) {
475- console . warn ( "[voice-call] Webhook verification succeeded without request identity key" ) ;
476- return { statusCode : 401 , body : "Unauthorized" } ;
477- }
497+ const verification = this . provider . verifyWebhook ( ctx ) ;
498+ if ( ! verification . ok ) {
499+ console . warn ( `[voice-call] Webhook verification failed: ${ verification . reason } ` ) ;
500+ return { statusCode : 401 , body : "Unauthorized" } ;
501+ }
502+ if ( ! verification . verifiedRequestKey ) {
503+ console . warn ( "[voice-call] Webhook verification succeeded without request identity key" ) ;
504+ return { statusCode : 401 , body : "Unauthorized" } ;
505+ }
478506
479- const parsed = this . provider . parseWebhookEvent ( ctx , {
480- verifiedRequestKey : verification . verifiedRequestKey ,
481- } ) ;
507+ const parsed = this . provider . parseWebhookEvent ( ctx , {
508+ verifiedRequestKey : verification . verifiedRequestKey ,
509+ } ) ;
510+
511+ if ( verification . isReplay ) {
512+ console . warn ( "[voice-call] Replay detected; skipping event side effects" ) ;
513+ } else {
514+ this . processParsedEvents ( parsed . events ) ;
515+ }
482516
483- if ( verification . isReplay ) {
484- console . warn ( "[voice-call] Replay detected; skipping event side effects" ) ;
485- } else {
486- this . processParsedEvents ( parsed . events ) ;
517+ return normalizeWebhookResponse ( parsed ) ;
518+ } finally {
519+ this . webhookInFlightLimiter . release ( inFlightKey ) ;
487520 }
521+ }
488522
489- return normalizeWebhookResponse ( parsed ) ;
523+ private verifyPreAuthWebhookHeaders ( headers : http . IncomingHttpHeaders ) : WebhookHeaderGateResult {
524+ if ( this . config . skipSignatureVerification ) {
525+ return { ok : true } ;
526+ }
527+ switch ( this . provider . name ) {
528+ case "telnyx" : {
529+ const signature = getHeader ( headers , "telnyx-signature-ed25519" ) ;
530+ const timestamp = getHeader ( headers , "telnyx-timestamp" ) ;
531+ if ( signature && timestamp ) {
532+ return { ok : true } ;
533+ }
534+ return { ok : false , reason : "missing Telnyx signature or timestamp header" } ;
535+ }
536+ case "twilio" :
537+ if ( getHeader ( headers , "x-twilio-signature" ) ) {
538+ return { ok : true } ;
539+ }
540+ return { ok : false , reason : "missing X-Twilio-Signature header" } ;
541+ case "plivo" : {
542+ const hasV3 =
543+ Boolean ( getHeader ( headers , "x-plivo-signature-v3" ) ) &&
544+ Boolean ( getHeader ( headers , "x-plivo-signature-v3-nonce" ) ) ;
545+ const hasV2 =
546+ Boolean ( getHeader ( headers , "x-plivo-signature-v2" ) ) &&
547+ Boolean ( getHeader ( headers , "x-plivo-signature-v2-nonce" ) ) ;
548+ if ( hasV3 || hasV2 ) {
549+ return { ok : true } ;
550+ }
551+ return { ok : false , reason : "missing Plivo signature headers" } ;
552+ }
553+ default :
554+ return { ok : true } ;
555+ }
490556 }
491557
492558 private processParsedEvents ( events : NormalizedEvent [ ] ) : void {
@@ -515,7 +581,7 @@ export class VoiceCallWebhookServer {
515581 private readBody (
516582 req : http . IncomingMessage ,
517583 maxBytes : number ,
518- timeoutMs = 30_000 ,
584+ timeoutMs = WEBHOOK_BODY_TIMEOUT_MS ,
519585 ) : Promise < string > {
520586 return readRequestBodyWithLimit ( req , { maxBytes, timeoutMs } ) ;
521587 }
0 commit comments