Plugins
A Plugin is an abstraction that will let your users add your feature to their Workflows in one line. It makes it seamless for platform developers to add their own custom functionality to many Workflows. Using plugins, you can build reusable open source libraries or build add-ons for engineers at your company.
Here are some common use cases for plugins:
- AI Agent SDKs
- Observability, tracing, or logging middleware
- Adding reliable built-in functionality such as LLM calls, corporate messaging, and payments infrastructure
- Encryption or compliance middleware
How to build a Plugin
The recommended way to start building plugins is with a SimplePlugin. This abstraction will tackle the vast majority of plugins people want to write.
For advanced use cases, you can extend the methods in lower-level classes that Simple Plugin is based on without re-implementing what you’ve done. See the Advanced Topics section for more information.
Example Plugins
If you prefer to learn by getting hands-on with code, check out some existing plugins.
- Temporal's Python SDK ships with an OpenAI Agents SDK plugin
- Pydantic plugin when it’s fixed
What you can provide to users in a plugin
There are a number of features you can give your users with a plugin. Here's a short list of some of the things you can do and more are being added.
- Built-in Activities
- Workflow libraries
- Built-in Child Workflows
- Built-in Nexus Operations
- Custom Data Converters
- Interceptors
Built-in Activity
You can provide built-in Activities in a Plugin for users to call from their Workflows. Activities are the most common Temporal primitive and should be scoped to:
- Single write operations
- Batches of similar writes
- One or more read operations followed by a write operation
- A read that should be memoized, like an LLM call, a large download, or a slow-polling read
Larger pieces of functionality should be broken it up into multiple activities. This makes it easier to do failure recovery, have short timeouts, and be idempotent.
Here are some best practices you can use when you are making Activity plugins:
- Activity arguments and return values must be serializable.
- Activities that perform writes should be idempotent.
- Activities have timeouts and retry policies. To be Activity-friendly, your operation should either complete within a few minutes or it should support the ability to heartbeat or poll for a result. This way it will be clear to the Workflow when the Activity is still making progress.
- You need to specify at least one timeout, typically the start_to_close_timeout. Keep in mind that the shorter the timeout, the faster Temporal will retry upon failure. See the timeouts and retry policies to learn more.
Timeouts and retry policies
Temporal's Activity retry mechanism gives applications the benefits of durable execution. For example, Temporal will keep track of the exponential backoff delay even if the Worker crashes. Since Temporal can’t tell when a Worker crashes, Workflows rely on the start_to_close_timeout to know how long to wait before assuming that an Activity is inactive.
Be cautious when doing retries within your Activity because it lengthens this overall timeout to be longer than the longest possible retry sequence. That means it takes too long to recover from other causes of failure. Such internal retries also prevent users from counting failure metrics and make it harder for users to debug in Temporal UI when something is wrong.
Follow the example for your SDK below:
@activity.defn
async def some_activity() -> None:
return None
plugin = SimplePlugin(
activities = [some_activity]
)
Workflow libraries
You can provide a library with functionality for use within a Workflow. Your library will call elements you include in your Plugin Activities, Child Workflows, Signals, Updates, Queries, Nexus Operations, Interceptors, Data Converters, and any other code as long as it follows these requirements:
- It should be deterministic, running the same way every time it’s executed. Non-deterministic code should go in Activities or Nexus Operations.
- It should be used in the Python sandbox.
- It should be designed to handle being restarted, resumed, or executed independently of where or how it originally began without losing correctness or state consistency.
- It should run quickly since it may run many times during a long Workflow execution.
A Plugin should allow a user to decompose their Workflows into Activities, as well as Child Workflows and Nexus Calls when needed. This gives users granular control through retries and timeouts, debuggability through the Temporal UI, operability with resets, pauses, and cancels, memoization for efficiency and resumability, and scalability using task queues and Workers.
Users use Workflows for:
- Orchestration and decision-making
- Interactivity via message-passing
- Tracing and observability.
Making changes to your Workflow Library
Your users may want to keep their Workflows running across deployments of their code. If their deployment includes a new version of your Plugin, changes to your Plugin could break Workflow code that started before the new version was deployed. This can be due to non-deterministic behavior from code changes in your Plugin.
Therefore, as you make changes, you need to use patching and replay testing to make sure that you’re not causing non-determinism errors for your users.
Example of a Workflow library that uses a Plugin in Python
Built-in Child Workflows
You can provide a built-in Workflow in a SimplePlugin. It’s callable as a Child Workflow or standalone. When you want to provide a piece of functionality that's more complex than an Activity, you can:
- Use a Workflow Library that runs directly in the end user’s Workflow
- Add a Child Workflow
Consider adding a Child Workflow when one or more of these conditions applies:
- That child should outlive the parent.
- The Workflow Event History would otherwise not scale in parent Workflows.
- When you want a separate Workflow ID for the child so that it can be operated independently of the parent's state (canceled, terminated, paused).
Any Workflow can be run as a standalone Workflow or as a Child Workflow, so registering a Child Workflow in a SimplePlugin is the same as registering any Workflow.
Follow the example for your SDK below:
@workflow.defn
class HelloWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return f"Hello, {name}!"
plugin = SimplePlugin(
workflows = [HelloWorkflow]
)
...
client = await Client.connect(
"localhost:7233",
plugins=[
plugin,
],
)
async with Worker(
client,
task_queue="task-queue",
):
client.execute_workflow(
HelloWorkflow.run,
"Tim",
task_queue=worker.task_queue,
)
Built-in Nexus Operations
Nexus calls are used from Workflows similar to Activities and you can check out some common Nexus Use Cases. Like Activities, Nexus Call arguments and return values must be serializable.
To register Nexus handlers in Workflows, follow the example for your SDK below:
@nexusrpc.service
class WeatherService:
get_weather_nexus_operation: nexusrpc.Operation[WeatherInput, Weather]
@nexusrpc.handler.service_handler(service=WeatherService)
class WeatherServiceHandler:
@nexusrpc.handler.sync_operation
async def get_weather_nexus_operation(
self, ctx: nexusrpc.handler.StartOperationContext, input: WeatherInput
) -> Weather:
return Weather(
city=input.city, temperature_range="14-20C", conditions="Sunny with wind."
)
plugin = SimplePlugin(
nexus_service_handlers = [WeatherServiceHandler()]
)
Custom Data Converters
A Custom Data Converter can alter data formats or provide compression or encryption.
Note that you can use an existing Data Converter such as PydanticPayloadConverter in your Plugin.
To add a Custom Data Converter to a Plugin, follow the example for your SDK below:
def add_converter(converter: Optional[DataConverter]) -> DataConverte
if converter is None or converter == temporalio.converter.DataConverter.default
return pydantic_data_converter
# Should consider interactions with other plugins,
# as this will override the data converter.
# This may mean failing, warning, or something else
return converter
plugin = SimplePlugin(
data_converter = add_converter
)
Interceptors
Interceptors are middleware that can run before and after various calls such as Activities, Workflows, and Signals. You can learn more about interceptors for the details of implementing them. They're used to:
- Create side effects such as logging and tracing.
- Modify arguments, such as adding headers for authorization or tracing propagation.
To add one to a Plugin, follow the example for your SDK below::
class SomeWorkerInterceptor(
temporalio.worker.Interceptor
):
# ...
plugin = SimplePlugin(
worker_interceptors = [SomeWorkerInterceptor()]
)
Advanced Topics for Plugins
If you go deeper into SimplePlugin, you can note that it aggregates a pair of raw Plugin classes that you can use for a higher level of flexibility: a Worker Plugin and a client Plugin.
- Worker Plugins contain functionality that runs inside your users’ Workflows.
- Client Plugins contain functionality that runs when Workflows are created and return results.
If your Plugin implements both of them, registering it in the client will also register it in Workers created with that client.
Client Plugin
Client Plugins are provided to the Temporal client on creation. They can change client configurations and service client configurations. ClientConfig contains settings like client Interceptors and DataConverters. ConnectConfig configures the actual network connections to the local or cloud Temporal server with values like an API key. This is the basic implementation of a client Plugin:
class MyAdvancedClientPlugin(temporalio.client.Plugin):
def configure_client(self, config: ClientConfig) -> ClientConfig:
return config
async def connect_service_client(
self,
config: ConnectConfig,
next: Callable[[ConnectConfig], Awaitable[ServiceClient]],
) -> temporalio.service.ServiceClient:
return await next(config)
The primary use case for integrations so far is setting a DataConverter, like in the Data Converter example.
Worker Plugin
Worker Plugins are provided at Worker creation and have more capabilities and corresponding implementation than client Plugins. They can change Worker configurations, run code during the Worker lifetime, and manage the Replayer in a similar way. You can learn more about the Replayer in a later section.
Similar to configure_client above, you implement configure_worker and configure_replayer to change any necessary configurations. In addition, run_worker allows you to execute code before and after the Worker runs. This can be used to set up resources or globals for use during the Worker execution. run_replayer does the same for the Replayer, but keep in mind that the Replayer has a more complex return type. This is a basic implementation of a Worker plugin:
class MyAdvancedWorkerPlugin(temporalio.worker.Plugin):
def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
return config
async def run_worker(
self, worker: Worker, next: Callable[[Worker], Awaitable[None]]
) -> None:
next(worker)
def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
return config
def run_replayer(
self,
replayer: Replayer,
histories: AsyncIterator[temporalio.client.WorkflowHistory],
next: Callable[
[Replayer, AsyncIterator[WorkflowHistory]],
AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]],
],
) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]:
return next(replayer, histories)
Replayer
The Replayer allows Workflow authors to validate that their Workflows will work after changes to either the Workflow or a library they depend on. It’s normally used in test runs or when testing Workers before they roll out in production.
The Replayer runs on a Workflow History created by a previous Workflow run. Suppose something in the Workflow or underlying code has changed in a way which could potentially cause a non-determinism error. In that case, the Replayer will notice the change in the way it runs compared to the history provided.
The Replayer is typically configured identically to the Worker and client. Ff you’re using SimplePlugin, this is already handled for you.
If you need to do something custom for the Replayer, you can configure it directly:
class MyAdvancedWorkerPlugin(temporalio.worker.Plugin):
def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
return config
def run_replayer(
self,
replayer: Replayer,
histories: AsyncIterator[temporalio.client.WorkflowHistory],
next: Callable[
[Replayer, AsyncIterator[WorkflowHistory]],
AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]],
],
) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]:
return next(replayer, histories)