0ebaf21f33
This PR implements the new design of **Pipeline** introduced by #4432 --- **2 types of pipelines differentiates by the source** Type I: **Realtime** - source is a stream (Logs, Metrics, & Traces) - takes effect when the source stream is being ingested Type II: **Scheduled** (aka DerivedSteram) - source is a SQL query - takes effect based on the given schedule The new pipeline is represented as a graph consists of different types of Nodes that are connected by edges. **Pipeline Data Model** ```rs pub struct Pipeline { pub id: String, pub version: i32, pub enabled: bool, pub org: String, pub name: String, pub description: String, pub source: PipelineSource, pub nodes: Vec<Node>, pub edges: Vec<Edge>, } ``` **4 types of node** I. StreamNode: either used for source node of realtime pipeline or for destination nodes II. QueryNode: used for source node of scheduled pipeline III. FunctionNode: used for executing vrl functions IV. ConditionNode: used for checking routing conditions Rules applied to validating a pipeline when it's created: 1. non-empty nodes list 2. non-empty edges list 3. 1st node in nodes list is either StreamNode or QueryNode 4. non-empty `conditions` in all ConditionNode nodes in nodes list 5. all leaf nodes are of type StreamNode 6. In the same branch, unchecked `after_flattened` FunctionNode can't follow checked `after_flattened` checked FunctionNode Pipeline execution is implemented with the struct `ExecutablePipeline`, a ready-execute pipeline object cached in memory based on the Pipeline objects modified on the UI side. `ExecutablePipeline` object - builds the relationships among all the nodes based on the edges - topologically sorts the nodes based on the level to determine which node to process first - registers all the vrl function node once `ExecutablePipeline` object processes ingested records in batch. Starting from the source node, each connecting edge has a channel connecting the parent and child node, used for passing the records. This approach enables batch processing in parallel. --- **Deprecated Features** 1. previous implementation of pipeline 2. Function x Stream associations - Functions can still be added/edited/removed as before - Needs to create a pipeline in order to apply chosen functions to the desired stream The new release when this pr is merged will automatically migrate **old pipelines** and **Function x Stream associations** to new pipeline format and can be found in `Stream Pipelines` tab with `Migrated-` prefix names. **_Note: Auto generated pipelines are paused by default. Please verify the pipelines before enabling them_** Co-authored-by: Taiming Liu <liutaiming3@gmail.com> Co-authored-by: Bhargav <BJP232004@GMAIL.COM> |
||
---|---|---|
.. | ||
src | ||
tests | ||
.gitignore | ||
.python-version | ||
Makefile | ||
README.md | ||
pyproject.toml | ||
requirements-dev.lock | ||
requirements.lock |
README.md
pytest-openobserve
This is a repo to test openobserve API endpoints.
Introduction:
The tests are running using pytest
against alpha1 for now.
Idea is to parametrize it for the other endpoints like production
as well.
Getting Started
- We use
rye
to manage the python version and environment - https://github.com/mitsuhiko/rye - Once installed, simply run the following steps:
rye sync
rye run pytest
TODOS:
- Parametrize tests to run against different environments
- Run the tests as cron-job every few hours or daily etc. for different environments
- Send slack-notification ( use github-actions ) in case something fails on
alpha1
orproduction
endpoints.