Skip to content

vllm.tool_parsers.deepseekv32_tool_parser

DeepSeekV32ToolParser

Bases: ToolParser

example tool call content: <|DSML|function_calls> <|DSML|invoke name="get_weather"> <|DSML|parameter name="location" string="true">杭州</|DSML|parameter> <|DSML|parameter name="date" string="true">2024-01-16</|DSML|parameter> </|DSML|invoke> <|DSML|invoke name="get_weather"> <|DSML|parameter name="location" string="true">北京</|DSML|parameter> <|DSML|parameter name="date" string="true">2024-01-16</|DSML|parameter> </|DSML|invoke> </|DSML|function_calls>

Source code in vllm/tool_parsers/deepseekv32_tool_parser.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
class DeepSeekV32ToolParser(ToolParser):
    """
    example tool call content:
    <|DSML|function_calls>
    <|DSML|invoke name="get_weather">
    <|DSML|parameter name="location" string="true">杭州</|DSML|parameter>
    <|DSML|parameter name="date" string="true">2024-01-16</|DSML|parameter>
    </|DSML|invoke>
    <|DSML|invoke name="get_weather">
    <|DSML|parameter name="location" string="true">北京</|DSML|parameter>
    <|DSML|parameter name="date" string="true">2024-01-16</|DSML|parameter>
    </|DSML|invoke>
    </|DSML|function_calls>
    """

    tool_call_start_token: str = "<|DSML|function_calls>"
    tool_call_end_token: str = "</|DSML|function_calls>"

    def __init__(self, tokenizer: TokenizerLike, tools: list[Tool] | None = None):
        super().__init__(tokenizer, tools)

        self.prev_tool_call_arr: list[dict] = []

        # Streaming state
        self.current_tool_index: int = 0
        self._sent_content_idx: int = 0
        self._buffer: str = ""
        self._in_tool_calls: bool = False
        self._active_tool_index: int | None = None
        self._active_tool_name: str | None = None
        self._active_param_name: str | None = None
        self._active_param_string_attr: str | None = None
        self._active_param_mode: str | None = None
        self._active_param_parts: list[str] = []
        self._args_started: list[bool] = []

        # Regex patterns for complete parsing
        self.tool_call_complete_regex = re.compile(
            re.escape(self.tool_call_start_token)
            + r"(.*?)"
            + re.escape(self.tool_call_end_token),
            re.DOTALL,
        )
        self.invoke_complete_regex = re.compile(
            r'<|DSML|invoke\s+name="([^"]+)"\s*>(.*?)</|DSML|invoke>', re.DOTALL
        )
        self.parameter_complete_regex = re.compile(
            r'<|DSML|parameter\s+name="([^"]+)"\s+string="(true|false)"\s*>(.*?)</|DSML|parameter>',
            re.DOTALL,
        )
        self.invoke_start_regex = re.compile(r'<|DSML|invoke\s+name="([^"]+)"\s*>')
        self.parameter_start_regex = re.compile(
            r'<|DSML|parameter\s+name="([^"]+)"\s+string="(true|false)"\s*>'
        )

        if not self.model_tokenizer:
            raise ValueError(
                "The model tokenizer must be passed to the ToolParser "
                "constructor during construction."
            )

        logger.debug(
            "vLLM Successfully import tool parser %s !", self.__class__.__name__
        )

    def adjust_request(
        self, request: ChatCompletionRequest | ResponsesRequest
    ) -> ChatCompletionRequest | ResponsesRequest:
        request = super().adjust_request(request)
        if request.tools and request.tool_choice != "none":
            # Ensure tool call tokens
            # (e.g. <|DSML|function_calls>, </|DSML|function_calls>)
            # are not skippedduring decoding.
            # Even though they are not marked as special tokens,
            # setting skip_special_tokens=False ensures proper handling in
            # transformers 5.x where decoding behavior may have changed.
            request.skip_special_tokens = False
        return request

    def _generate_tool_call_id(self) -> str:
        """Generate a unique tool call ID."""
        return f"call_{uuid.uuid4().hex[:24]}"

    def _parse_invoke_params(self, invoke_str: str) -> dict[str, tuple[str, str]]:
        param_dict: dict[str, tuple[str, str]] = {}
        for param_name, string_attr, param_val in self.parameter_complete_regex.findall(
            invoke_str
        ):
            param_dict[param_name] = (param_val, string_attr)
        return param_dict

    @staticmethod
    def _repair_param_dict(
        param_dict: dict[str, Any],
        param_config: dict[str, Any],
    ) -> dict[str, Any]:
        """Unwrap single 'arguments' / 'input' wrappers when the wrapper
        is not part of the requested tool schema and the wrapped object
        matches the schema fields."""
        allowed = set(param_config.keys())
        for wrapper in ("arguments", "input"):
            if set(param_dict.keys()) != {wrapper} or wrapper in allowed:
                continue
            inner = param_dict[wrapper]
            if isinstance(inner, str):
                try:
                    inner = json.loads(inner)
                except json.JSONDecodeError:
                    return param_dict
            if isinstance(inner, dict) and set(inner.keys()).issubset(allowed):
                return inner
        return param_dict

    def _convert_params_with_schema(
        self,
        function_name: str,
        param_dict: dict[str, tuple[str, str]],
    ) -> dict[str, Any]:
        """Convert raw string param values using the tool schema types."""
        param_config = find_tool_properties(self.tools, function_name)

        converted: dict[str, Any] = {}
        for name, (value, string_attr) in param_dict.items():
            if string_attr == "true":
                converted[name] = value
                continue

            param_types = extract_types_from_schema(param_config.get(name, {}))
            converted[name] = coerce_to_schema_type(value, param_types)
        return self._repair_param_dict(converted, param_config)

    def _get_param_config(self, function_name: str | None) -> dict[str, Any]:
        if not function_name or not self.tools:
            return {}
        return find_tool_properties(self.tools, function_name)

    @staticmethod
    def _json_escape_string_content(text: str) -> str:
        return json.dumps(text, ensure_ascii=False)[1:-1]

    def extract_tool_calls(
        self,
        model_output: str,
        request: ChatCompletionRequest,
    ) -> ExtractedToolCallInformation:
        """Extract tool calls from complete model output (non-streaming)."""
        # Quick check
        if self.tool_call_start_token not in model_output:
            return ExtractedToolCallInformation(
                tools_called=False, tool_calls=[], content=model_output
            )

        try:
            tool_calls = []

            # Find all complete tool_call blocks
            for tool_call_match in self.tool_call_complete_regex.findall(model_output):
                # Find all invokes within this tool_call
                for invoke_name, invoke_content in self.invoke_complete_regex.findall(
                    tool_call_match
                ):
                    param_dict = self._parse_invoke_params(invoke_content)
                    params = self._convert_params_with_schema(invoke_name, param_dict)
                    tool_calls.append(
                        ToolCall(
                            type="function",
                            function=FunctionCall(
                                name=invoke_name,
                                arguments=json.dumps(params, ensure_ascii=False),
                            ),
                        )
                    )

            if not tool_calls:
                return ExtractedToolCallInformation(
                    tools_called=False, tool_calls=[], content=model_output
                )

            # Extract content before first tool call
            first_tool_idx = model_output.find(self.tool_call_start_token)
            content = model_output[:first_tool_idx] if first_tool_idx > 0 else None

            return ExtractedToolCallInformation(
                tools_called=True, tool_calls=tool_calls, content=content
            )

        except Exception:
            logger.exception("Error extracting tool calls")
            return ExtractedToolCallInformation(
                tools_called=False, tool_calls=[], content=model_output
            )

    def _reset_streaming_state(self):
        """Reset all streaming state."""
        self.current_tool_index = 0
        self._sent_content_idx = 0
        self._buffer = ""
        self._in_tool_calls = False
        self._active_tool_index = None
        self._active_tool_name = None
        self._active_param_name = None
        self._active_param_string_attr = None
        self._active_param_mode = None
        self._active_param_parts.clear()
        self.prev_tool_call_arr.clear()
        self.streamed_args_for_tool.clear()
        self._args_started.clear()

    def _add_tool_call_delta(
        self,
        tool_call_deltas: dict[int, DeltaToolCall],
        index: int,
        *,
        call_id: str | None = None,
        call_type: Literal["function"] | None = None,
        name: str | None = None,
        arguments: str | None = None,
    ) -> None:
        if arguments:
            self.streamed_args_for_tool[index] += arguments

        if index not in tool_call_deltas:
            tool_call_deltas[index] = DeltaToolCall(
                index=index,
                id=call_id,
                type=call_type,
                function=DeltaFunctionCall(name=name, arguments=arguments),
            )
            return

        delta = tool_call_deltas[index]
        if call_id is not None:
            delta.id = call_id
        if call_type is not None:
            delta.type = call_type
        if delta.function is None:
            delta.function = DeltaFunctionCall()
        if name is not None:
            delta.function.name = name
        if arguments is not None:
            delta.function.arguments = (delta.function.arguments or "") + arguments

    def _begin_streaming_tool_call(
        self,
        name: str,
        tool_call_deltas: dict[int, DeltaToolCall],
    ) -> None:
        index = self.current_tool_index
        self.current_tool_index += 1
        self._active_tool_index = index
        self._active_tool_name = name
        self.prev_tool_call_arr.append({"name": name, "arguments": {}})
        self.streamed_args_for_tool.append("")
        self._args_started.append(False)
        self._add_tool_call_delta(
            tool_call_deltas,
            index,
            call_id=self._generate_tool_call_id(),
            call_type="function",
            name=name,
            arguments="",
        )

    def _append_param_prefix(
        self,
        tool_call_deltas: dict[int, DeltaToolCall],
        index: int,
        key: str,
        *,
        as_string: bool,
    ) -> None:
        prefix = "{" if not self._args_started[index] else ","
        self._args_started[index] = True
        arguments = prefix + json.dumps(key, ensure_ascii=False) + ":"
        if as_string:
            arguments += '"'
        self._add_tool_call_delta(tool_call_deltas, index, arguments=arguments)

    def _append_json_param_value(
        self,
        tool_call_deltas: dict[int, DeltaToolCall],
        index: int,
        key: str,
        value: Any,
    ) -> None:
        self._append_param_prefix(tool_call_deltas, index, key, as_string=False)
        self._add_tool_call_delta(
            tool_call_deltas,
            index,
            arguments=json.dumps(value, ensure_ascii=False),
        )

    def _param_types_for_name(self, name: str) -> list[str]:
        param_config = self._get_param_config(self._active_tool_name)
        if name in param_config and isinstance(param_config[name], dict):
            return extract_types_from_schema(param_config[name])
        return ["string"]

    @staticmethod
    def _can_stream_raw_param(param_types: list[str]) -> bool:
        # Scalars and unions need the complete value so streaming and
        # non-streaming share the same coercion fallback behavior.
        return set(param_types).issubset({"object", "array"})

    def _should_buffer_wrapper_param(self, name: str) -> bool:
        if (
            self._active_tool_index is None
            or self._args_started[self._active_tool_index]
        ):
            return False
        param_config = self._get_param_config(self._active_tool_name)
        return bool(
            param_config and name in ("arguments", "input") and name not in param_config
        )

    def _finish_buffered_param(
        self,
        tool_call_deltas: dict[int, DeltaToolCall],
        index: int,
    ) -> None:
        assert self._active_param_name is not None
        assert self._active_param_string_attr is not None
        raw_value = "".join(self._active_param_parts)
        converted = self._convert_params_with_schema(
            self._active_tool_name or "",
            {self._active_param_name: (raw_value, self._active_param_string_attr)},
        )
        for key, value in converted.items():
            self._append_json_param_value(tool_call_deltas, index, key, value)

    def _close_streaming_tool_call(
        self,
        tool_call_deltas: dict[int, DeltaToolCall],
    ) -> None:
        index = self._active_tool_index
        if index is None:
            return

        suffix = "}" if self._args_started[index] else "{}"
        self._add_tool_call_delta(tool_call_deltas, index, arguments=suffix)
        try:
            self.prev_tool_call_arr[index] = {
                "name": self._active_tool_name,
                "arguments": json.loads(self.streamed_args_for_tool[index]),
            }
        except (json.JSONDecodeError, IndexError):
            logger.exception("Failed to finalize DeepSeek DSML streaming tool call")

        self._active_tool_index = None
        self._active_tool_name = None
        self._active_param_name = None
        self._active_param_string_attr = None
        self._active_param_mode = None
        self._active_param_parts.clear()

    def _process_streaming_buffer(
        self,
        content_parts: list[str],
        tool_call_deltas: dict[int, DeltaToolCall],
    ) -> None:
        parameter_end_token = "</|DSML|parameter>"
        invoke_end_token = "</|DSML|invoke>"

        while True:
            if not self._in_tool_calls:
                start_idx = self._buffer.find(self.tool_call_start_token)
                if start_idx == -1:
                    overlap = partial_tag_overlap(
                        self._buffer, self.tool_call_start_token
                    )
                    sendable_idx = len(self._buffer) - overlap
                    if sendable_idx > 0:
                        content_parts.append(self._buffer[:sendable_idx])
                        self._buffer = self._buffer[sendable_idx:]
                    return

                if start_idx > 0:
                    content_parts.append(self._buffer[:start_idx])
                    self._buffer = self._buffer[start_idx:]
                    continue

                self._buffer = self._buffer[len(self.tool_call_start_token) :]
                self._in_tool_calls = True
                continue

            if self._active_tool_index is None:
                stripped_len = len(self._buffer) - len(self._buffer.lstrip())
                if stripped_len:
                    self._buffer = self._buffer[stripped_len:]
                    continue

                if self._buffer.startswith(self.tool_call_end_token):
                    self._buffer = self._buffer[len(self.tool_call_end_token) :]
                    self._in_tool_calls = False
                    continue

                match = self.invoke_start_regex.match(self._buffer)
                if match is None:
                    return

                self._buffer = self._buffer[match.end() :]
                self._begin_streaming_tool_call(match.group(1), tool_call_deltas)
                continue

            index = self._active_tool_index

            if self._active_param_mode is not None:
                end_pos = self._buffer.find(parameter_end_token)
                if end_pos != -1:
                    raw_content = self._buffer[:end_pos]
                    self._buffer = self._buffer[end_pos + len(parameter_end_token) :]
                    if self._active_param_mode in ("wrapper", "buffered"):
                        self._active_param_parts.append(raw_content)
                        self._finish_buffered_param(tool_call_deltas, index)
                    elif self._active_param_mode == "string":
                        arguments = self._json_escape_string_content(raw_content) + '"'
                        self._add_tool_call_delta(
                            tool_call_deltas, index, arguments=arguments
                        )
                    else:
                        self._add_tool_call_delta(
                            tool_call_deltas, index, arguments=raw_content
                        )

                    self._active_param_name = None
                    self._active_param_string_attr = None
                    self._active_param_mode = None
                    self._active_param_parts.clear()
                    continue

                overlap = partial_tag_overlap(self._buffer, parameter_end_token)
                safe_len = len(self._buffer) - overlap
                if safe_len > 0:
                    raw_content = self._buffer[:safe_len]
                    self._buffer = self._buffer[safe_len:]
                    if self._active_param_mode in ("wrapper", "buffered"):
                        self._active_param_parts.append(raw_content)
                    elif self._active_param_mode == "string":
                        self._add_tool_call_delta(
                            tool_call_deltas,
                            index,
                            arguments=self._json_escape_string_content(raw_content),
                        )
                    else:
                        self._add_tool_call_delta(
                            tool_call_deltas, index, arguments=raw_content
                        )
                return

            stripped_len = len(self._buffer) - len(self._buffer.lstrip())
            if stripped_len:
                self._buffer = self._buffer[stripped_len:]
                continue

            if self._buffer.startswith(invoke_end_token):
                self._buffer = self._buffer[len(invoke_end_token) :]
                self._close_streaming_tool_call(tool_call_deltas)
                continue

            match = self.parameter_start_regex.match(self._buffer)
            if match is None:
                return

            self._buffer = self._buffer[match.end() :]
            name = match.group(1)
            string_attr = match.group(2)
            self._active_param_name = name
            self._active_param_string_attr = string_attr

            if self._should_buffer_wrapper_param(name):
                self._active_param_mode = "wrapper"
                continue

            if string_attr == "true":
                self._append_param_prefix(tool_call_deltas, index, name, as_string=True)
                self._active_param_mode = "string"
                continue

            param_types = self._param_types_for_name(name)
            if not self._can_stream_raw_param(param_types):
                self._active_param_mode = "buffered"
                continue

            self._append_param_prefix(tool_call_deltas, index, name, as_string=False)
            self._active_param_mode = "raw"

    def extract_tool_calls_streaming(
        self,
        previous_text: str,
        current_text: str,
        delta_text: str,
        previous_token_ids: Sequence[int],  # pylint: disable=unused-argument
        current_token_ids: Sequence[int],  # pylint: disable=unused-argument
        delta_token_ids: Sequence[int],
        request: ChatCompletionRequest,
    ) -> DeltaMessage | None:
        """Extract tool calls from streaming model output.

        Buffers DSML markup while streaming tool-call metadata and argument
        JSON fragments as soon as they are complete enough to be valid deltas.
        """

        # First chunk of a new stream — reset state from prior request.
        if not previous_text:
            self._reset_streaming_state()

        self._buffer += delta_text
        content_parts: list[str] = []
        tool_call_deltas: dict[int, DeltaToolCall] = {}
        self._process_streaming_buffer(content_parts, tool_call_deltas)

        if content_parts or tool_call_deltas:
            content = "".join(content_parts) or None
            return DeltaMessage(
                content=content, tool_calls=list(tool_call_deltas.values())
            )

        # Empty delta with token ids means EOS or closing tag; return
        # non-None so the serving framework can finalize finish_reason.
        if not delta_text and delta_token_ids and self.prev_tool_call_arr:
            return DeltaMessage(content="")

        return None

_convert_params_with_schema

_convert_params_with_schema(
    function_name: str,
    param_dict: dict[str, tuple[str, str]],
) -> dict[str, Any]

Convert raw string param values using the tool schema types.

Source code in vllm/tool_parsers/deepseekv32_tool_parser.py
def _convert_params_with_schema(
    self,
    function_name: str,
    param_dict: dict[str, tuple[str, str]],
) -> dict[str, Any]:
    """Convert raw string param values using the tool schema types."""
    param_config = find_tool_properties(self.tools, function_name)

    converted: dict[str, Any] = {}
    for name, (value, string_attr) in param_dict.items():
        if string_attr == "true":
            converted[name] = value
            continue

        param_types = extract_types_from_schema(param_config.get(name, {}))
        converted[name] = coerce_to_schema_type(value, param_types)
    return self._repair_param_dict(converted, param_config)

_generate_tool_call_id

_generate_tool_call_id() -> str

Generate a unique tool call ID.

Source code in vllm/tool_parsers/deepseekv32_tool_parser.py
def _generate_tool_call_id(self) -> str:
    """Generate a unique tool call ID."""
    return f"call_{uuid.uuid4().hex[:24]}"

_repair_param_dict staticmethod

_repair_param_dict(
    param_dict: dict[str, Any], param_config: dict[str, Any]
) -> dict[str, Any]

Unwrap single 'arguments' / 'input' wrappers when the wrapper is not part of the requested tool schema and the wrapped object matches the schema fields.

Source code in vllm/tool_parsers/deepseekv32_tool_parser.py
@staticmethod
def _repair_param_dict(
    param_dict: dict[str, Any],
    param_config: dict[str, Any],
) -> dict[str, Any]:
    """Unwrap single 'arguments' / 'input' wrappers when the wrapper
    is not part of the requested tool schema and the wrapped object
    matches the schema fields."""
    allowed = set(param_config.keys())
    for wrapper in ("arguments", "input"):
        if set(param_dict.keys()) != {wrapper} or wrapper in allowed:
            continue
        inner = param_dict[wrapper]
        if isinstance(inner, str):
            try:
                inner = json.loads(inner)
            except json.JSONDecodeError:
                return param_dict
        if isinstance(inner, dict) and set(inner.keys()).issubset(allowed):
            return inner
    return param_dict

_reset_streaming_state

_reset_streaming_state()

Reset all streaming state.

Source code in vllm/tool_parsers/deepseekv32_tool_parser.py
def _reset_streaming_state(self):
    """Reset all streaming state."""
    self.current_tool_index = 0
    self._sent_content_idx = 0
    self._buffer = ""
    self._in_tool_calls = False
    self._active_tool_index = None
    self._active_tool_name = None
    self._active_param_name = None
    self._active_param_string_attr = None
    self._active_param_mode = None
    self._active_param_parts.clear()
    self.prev_tool_call_arr.clear()
    self.streamed_args_for_tool.clear()
    self._args_started.clear()

extract_tool_calls

extract_tool_calls(
    model_output: str, request: ChatCompletionRequest
) -> ExtractedToolCallInformation

Extract tool calls from complete model output (non-streaming).

Source code in vllm/tool_parsers/deepseekv32_tool_parser.py
def extract_tool_calls(
    self,
    model_output: str,
    request: ChatCompletionRequest,
) -> ExtractedToolCallInformation:
    """Extract tool calls from complete model output (non-streaming)."""
    # Quick check
    if self.tool_call_start_token not in model_output:
        return ExtractedToolCallInformation(
            tools_called=False, tool_calls=[], content=model_output
        )

    try:
        tool_calls = []

        # Find all complete tool_call blocks
        for tool_call_match in self.tool_call_complete_regex.findall(model_output):
            # Find all invokes within this tool_call
            for invoke_name, invoke_content in self.invoke_complete_regex.findall(
                tool_call_match
            ):
                param_dict = self._parse_invoke_params(invoke_content)
                params = self._convert_params_with_schema(invoke_name, param_dict)
                tool_calls.append(
                    ToolCall(
                        type="function",
                        function=FunctionCall(
                            name=invoke_name,
                            arguments=json.dumps(params, ensure_ascii=False),
                        ),
                    )
                )

        if not tool_calls:
            return ExtractedToolCallInformation(
                tools_called=False, tool_calls=[], content=model_output
            )

        # Extract content before first tool call
        first_tool_idx = model_output.find(self.tool_call_start_token)
        content = model_output[:first_tool_idx] if first_tool_idx > 0 else None

        return ExtractedToolCallInformation(
            tools_called=True, tool_calls=tool_calls, content=content
        )

    except Exception:
        logger.exception("Error extracting tool calls")
        return ExtractedToolCallInformation(
            tools_called=False, tool_calls=[], content=model_output
        )

extract_tool_calls_streaming

extract_tool_calls_streaming(
    previous_text: str,
    current_text: str,
    delta_text: str,
    previous_token_ids: Sequence[int],
    current_token_ids: Sequence[int],
    delta_token_ids: Sequence[int],
    request: ChatCompletionRequest,
) -> DeltaMessage | None

Extract tool calls from streaming model output.

Buffers DSML markup while streaming tool-call metadata and argument JSON fragments as soon as they are complete enough to be valid deltas.

Source code in vllm/tool_parsers/deepseekv32_tool_parser.py
def extract_tool_calls_streaming(
    self,
    previous_text: str,
    current_text: str,
    delta_text: str,
    previous_token_ids: Sequence[int],  # pylint: disable=unused-argument
    current_token_ids: Sequence[int],  # pylint: disable=unused-argument
    delta_token_ids: Sequence[int],
    request: ChatCompletionRequest,
) -> DeltaMessage | None:
    """Extract tool calls from streaming model output.

    Buffers DSML markup while streaming tool-call metadata and argument
    JSON fragments as soon as they are complete enough to be valid deltas.
    """

    # First chunk of a new stream — reset state from prior request.
    if not previous_text:
        self._reset_streaming_state()

    self._buffer += delta_text
    content_parts: list[str] = []
    tool_call_deltas: dict[int, DeltaToolCall] = {}
    self._process_streaming_buffer(content_parts, tool_call_deltas)

    if content_parts or tool_call_deltas:
        content = "".join(content_parts) or None
        return DeltaMessage(
            content=content, tool_calls=list(tool_call_deltas.values())
        )

    # Empty delta with token ids means EOS or closing tag; return
    # non-None so the serving framework can finalize finish_reason.
    if not delta_text and delta_token_ids and self.prev_tool_call_arr:
        return DeltaMessage(content="")

    return None