Create Plugins
A step by step instruction on how to create plugins.
Plugin system is a poweful mechanism offering you dynamic control over request inputs and response outputs at each DeepFellow endpoint.
Prerequisites
To create a plugin, you need an installed DeepFellow Server.
Step-by-Step Instructions
Create directory
Navigate to the server directory. By default, it's placed in ~/.deepfellow/server/. In this tutorial, the example plugin is called a custom_plugin.
$ cd ~/.deepfellow/server
$ ls -la
total 16
drwxr-xr-x 2 simplito simplito 4096 Sep 17 10:23 .
drwxr-xr-x 4 simplito simplito 4096 Sep 16 20:28 ..
-rw-r--r-- 1 simplito simplito 3528 Sep 17 10:23 docker-compose.yml
-rw-r--r-- 1 simplito simplito 993 Sep 17 10:04 .env
$ mkdir -p plugins/custom_plugin
$ cd plugins/custom_pluginConfiguration
Configuration is in the config.json file.
{
"name": "custom_plugin",
"version": "1.0.0",
"description": "Prints arguments and the response",
"author": "DeepFellow Tutorial",
"enabled": true,
"routes": ["/v1/chat/completions"],
"priority": 10
}name– defines the name of the plugin. It will be used as an identification of the plugin in the system, so it has to be unique.version– is version of the plugin. Useful in logs, so keep it updated.description– (optional) provides information about what the plugin does.author– specifies who created the plugin.enabled– boolean field that specifies if the plugin should be enabled on the first run. Useful in development.routes– (optional) list of endpoints to connect to. In this example, it's connecting to the Chat Completions. If not provided, the plugin will be called on all DeepFellow Server endpoints.priority– defines the order of the plugins. It is a FIFO (first in, first out) priority. Lower number means closer to the endpoint.
Minimal functionality in the custom plugin
Create a simple plugin that will print a message to the logs when initialized. In the plugin.py file:
from server.core.plugin_manager import BasePlugin, PluginContext
class MyCustomPlugin(BasePlugin):
async def initialize(self) -> bool:
uvicorn_logger.info("Hello world")
return TrueAfter creating the plugin, you'll see the plugin reporting on initialization:
INFO: Connected to Filestorage temporary path: storage/tmp
Hello world
INFO: Loaded plugin: custom_plugin, v1.0.0
INFO: Loaded plugin: df_abuse_detection_plugin, v1.0.0
INFO: 🎯 [DF TEST PLUGIN] initialized
INFO: Loaded plugin: df_test_plugin, v1.0.0
INFO: Loaded plugin: df_anonymize_plugin, v1.0.0
INFO: Discovered 4 plugins: ['custom_plugin', 'df_abuse_detection_plugin', 'df_test_plugin', 'df_anonymize_plugin']
INFO: DeepFellow Server started
INFO: Application startup complete.You can also reload the plugin using the API:
- Login to the server as admin to receive the token:
curl -X 'POST' \
'https://deepfellow-server-host/auth/login' \
-H 'Content-Type: application/json' \
-d '{
"email": "admin@example.com",
"password": "password123"
}' | jq '.'
{
"access_token": "dfuser_92724d86-028f-40fa-8af1-872efea8d503",
"expired_at": 1791297710
}- Discover new plugins:
curl -X 'POST' \
'https://deepfellow-server-host/plugins/discover' \
-H 'Content-Type: application/json' \
-H 'Authorization: Bearer dfuser_92724d86-028f-40fa-8af1-872efea8d503' \
-d '' | jq '.'
{
"success": true,
"message": "Plugin discovery started",
"data": null
}- List plugins:
curl -X 'GET' \
'https://deepfellow-server-host/plugins' \
-H 'Content-Type: application/json' \
-H 'Authorization: Bearer dfuser_92724d86-028f-40fa-8af1-872efea8d503'
{
"plugins": [
{
"name": "custom_plugin",
"version": "1.0.0",
"enabled": true,
"priority": 10,
"routes": [
"/v1/chat/completions"
],
"description": "Prints arguments and response"
},
{
"name": "df_abuse_detection_plugin",
"version": "1.0.0",
"enabled": true,
"priority": 100,
"routes": [
"/v1/chat/completions"
],
"description": "Checks prompts for abuse"
},
// [...]
],
"total": 4
}Connect to the endpoint before the request
Add a method that will print the whole plugin.py file when it's called.
Use uvicorn_logger.info() instead of print() to print to the docker logs.
from server.core.plugin_manager import BasePlugin, PluginContext
from server.utils.logger import uvicorn_logger
class MyCustomPlugin(BasePlugin):
async def initialize(self) -> bool:
uvicorn_logger.info("Hello world")
return True
async def before_request(self, context: PluginContext) -> PluginContext:
uvicorn_logger.info("Request started.")
return contextThis changes nothing, but is called just before the endpoint and prints "Request started."
After calling the Chat Completions endpoint, you will see the following in the logs:
curl -X 'POST' \
'https://deepfellow-server-host/v1/chat/completions' \
-H 'OpenAI-Organization: 68d38dd95ba3123af100df5c' \
-H 'OpenAI-Project: 68d38dd95ba3123af100df5d' \
-H 'Authorization: Bearer dfuser_92724d86-028f-40fa-8af1-872efea8d503' \
-H 'Content-Type: application/json' \
-d '{
"messages": [
{
"content": "You are a helpful assistant.",
"role": "system"
},
{
"content": "Hello!",
"role": "user"
}
],
"model": "llama3.1:8b",
"stream": false
}'INFO: Request started.
INFO: DFAnonymizePlugin: Anonymizing requstConnect to the result of the endpoint
Add uvicorn_logger.info() to the after_request() method.
from fastapi.responses import StreamingResponse
from server.core.plugin_manager import BasePlugin, PluginContext
from server.utils.logger import uvicorn_logger
class MyCustomPlugin(BasePlugin):
async def initialize(self) -> bool:
uvicorn_logger.info("Hello world")
return True
async def before_request(self, context: PluginContext) -> PluginContext:
uvicorn_logger.info("Request started.")
return context
async def after_request(self, context: PluginContext, result: StreamingResponse) -> StreamingResponse:
uvicorn_logger.info("Request completed.")
return resultLog the request
Both methods receive the context: PluginContext argument. It is providing the following information:
request–fastapi.Requestresponse–fastapi.Responseuser_id–strUser identificationmetadata–dict[str, Any]Information about the request. In this case{'route': '/v1/chat/completions', 'method': 'POST'}function_args–tupleArguments used when calling the endpointfunction_kwargs–dict[str, Any]Key arguments used when calling the endpoint
To keep this example simple, just read the body argument of the endpoint.
Modify the before_request() to print last message content.
async def before_request(self, context: PluginContext) -> PluginContext:
body = context.function_kwargs["body"]
latest_content = body.messages[-1].content
uvicorn_logger.info(f"{latest_content=}")
return contextUsing the same curl call as above, you will get the content instead of the "Hello world" message:
INFO: latest_content='Hello!'
INFO: DFAnonymizePlugin: Anonymizing requstYou can modify the message and then return it for endpoint to consume. DFAnonymizationPlugin is doing exactly that.
Log the result
This is a little more complicated due to the asynchronous nature of the response from the endpoint.
Write an async method parse_stream() that will collect the chunks, join them, and load the concatenated string as JSON. Then find the content in the response dictionary.
async def after_request(self, context: PluginContext, result: StreamingResponse) -> StreamingResponse:
async def parse_stream() -> AsyncGenerator[Any]:
chunks = []
async for chunk in result.body_iterator:
chunks.append(chunk)
response_text = b"".join(chunks).decode("utf-8")
response_dict = json.loads(response_text)
content = response_dict["choices"][0]["message"]["content"]
uvicorn_logger.info(f"{content=}")
yield b"".join(chunks).decode("utf-8")
return StreamingResponse(parse_stream(), media_type=result.media_type, headers=result.headers, status_code=result.status_code)Again, running the curl command from above will give a result similar to the one below:
INFO: DFAnonymizePlugin: Deanonymizing response.
INFO: localhost:59590 - "POST /v1/chat/completions HTTP/1.1" 200 OK
INFO: content="Hello! How can I assist you today? Do you have any questions, need help with something, or just want to chat? I'm here to listen and help in any way I can!"The content can be modified and yielded from the parse_stream(). It will be present to the client as a response from the endpoint.
Full Code
import json
from collections.abc import AsyncGenerator
from typing import Any
from fastapi.responses import StreamingResponse
from server.core.plugin_manager import BasePlugin, PluginContext
from server.utils.logger import uvicorn_logger
class MyCustomPlugin(BasePlugin):
async def initialize(self) -> bool:
return True
async def before_request(self, context: PluginContext) -> PluginContext:
body = context.function_kwargs["body"]
latest_content = body.messages[-1].content
uvicorn_logger.info(f"{latest_content=}")
return context
async def after_request(self, context: PluginContext, result: StreamingResponse) -> StreamingResponse:
async def parse_stream() -> AsyncGenerator[Any]:
chunks = []
async for chunk in result.body_iterator:
chunks.append(chunk)
response_text = b"".join(chunks).decode("utf-8")
response_dict = json.loads(response_text)
content = response_dict["choices"][0]["message"]["content"]
uvicorn_logger.info(f"{content=}")
yield b"".join(chunks).decode("utf-8")
return StreamingResponse(parse_stream(), media_type=result.media_type, headers=result.headers, status_code=result.status_code)We use cookies on our website. We use them to ensure proper functioning of the site and, if you agree, for purposes such as analytics, marketing, and targeting ads.