🧙 The modern replacement for Airflow. Build, run, and manage data pipelines for integrating and transforming data.
Add conditional block to Mage. The conditional block is an "Add-on" block that can be added to an existing block within a pipeline. If the conditional block evaluates as False, the parent block will not be executed.
Doc: https://docs.mage.ai/development/blocks/conditionals/overview
For standard pipelines (not currently supported in integration or streaming pipelines), you can save the output of a block that has been run as a CSV file. You can save the block output in Pipeline Editor page or Block Runs page.
Doc: https://docs.mage.ai/orchestration/pipeline-runs/saving-block-output-as-csv
Mage supports customizing Spark session for a pipeline by specifying the spark_config
in the pipeline metadata.yaml
file. The pipeline level spark_config
will override the project level spark_config
if specified.
Doc: https://docs.mage.ai/integrations/spark-pyspark#custom-spark-session-at-the-pipeline-level
Doc: https://github.com/mage-ai/mage-ai/tree/master/mage_integrations/mage_integrations/sources/api
Users can customize the notification templates of different channels (slack, email, etc.) in project metadata.yaml. Hare are the supported variables that can be interpolated in the message templates: execution_time
, pipeline_run_url
, pipeline_schedule_id
, pipeline_schedule_name
, pipeline_uuid
Example config in project's metadata.yaml
notification_config:
slack_config:
webhook_url: "{{ env_var('MAGE_SLACK_WEBHOOK_URL') }}"
message_templates:
failure:
details: >
Failure to execute pipeline {pipeline_run_url}.
Pipeline uuid: {pipeline_uuid}. Trigger name: {pipeline_schedule_name}.
Test custom message."
Doc: https://docs.mage.ai/production/observability/alerting-slack#customize-message-templates
Mage stores orchestration data, user data, and secrets data in a database. In addition to SQLite and Postgres, Mage supports using MSSQL and MySQL as the database engine now.
MSSQL docs:
MySQL docs:
Mage supports connecting to MinIO and Wasabi by specifying the AWS_ENDPOINT
field in S3 config now.
Doc: https://docs.mage.ai/integrations/databases/S3#minio-support
To maximize block reuse, you can use dynamic and replica blocks in combination.
CREATE SCHEMA IF NOT EXISTS
is not supported by MSSQL. Provided a default command in BaseSQL -> build_create_schema_command, and an overridden implementation in MSSQL -> build_create_schema_command containing compatible syntax. (Kudos to gjvanvuuren)kwargs
passing so that RabbitMQ messages can be acknowledged correctly.Support reusing same block multiple times in a single pipeline.
Doc: https://docs.mage.ai/design/blocks/replicate-blocks
Support running Spark code on Yarn cluster with Mage.
Doc: https://docs.mage.ai/integrations/spark-pyspark#hadoop-and-yarn-cluster-for-spark
Mage supports configuring automatic retry for block runs with the following ways
retry_config
 to project’s metadata.yaml
. This retry_config
 will be applied to all block runs.retry_config
 to the block config in pipeline’s metadata.yaml
. The block level retry_config
 will override the global retry_config
.Example config:
retry_config:
# Number of retry times
retries: 0
# Initial delay before retry. If exponential_backoff is true,
# the delay time is multiplied by 2 for the next retry
delay: 5
# Maximum time between the first attempt and the last retry
max_delay: 60
# Whether to use exponential backoff retry
exponential_backoff: true
Doc: https://docs.mage.ai/orchestration/pipeline-runs/retrying-block-runs#automatic-retry
When running DBT block with language YAML, interpolate and merge the user defined --vars in the block’s code into the variables that Mage automatically constructs
--select demo/models --vars '{"demo_key": "demo_value", "date": 20230101}'
--select demo/models --vars {"demo_key":"demo_value","date":20230101}
--select demo/models --vars '{"global_var": {{ test_global_var }}, "env_var": {{ test_env_var }}}'
--select demo/models --vars {"refresh":{{page_refresh}},"env_var":{{env}}}
Support dbt_project.yml
custom project names and custom profile names that are different than the DBT folder name
Allow user to configure block to run DBT snapshot
Support using dynamic child blocks for SQL blocks
Doc: https://docs.mage.ai/design/blocks/dynamic-blocks#dynamic-sql-blocks
If your Mage app is deployed on Microsoft Azure with Mage’s terraform scripts, you can choose to launch separate Azure container instances to execute blocks.
mage start project_name --instance-type scheduler
mage start project_name --instance-type web_server
mage start project_name --instance-type server_and_scheduler
Support “Add”, “Rename”, “Move”, “Delete” operations on folder.
Allow specifying envs
value to apply triggers only in certain environments.
Example:
triggers:
- name: test_example_trigger_in_prod
schedule_type: time
schedule_interval: "@daily"
start_time: 2023-01-01
status: active
envs:
- prod
- name: test_example_trigger_in_dev
schedule_type: time
schedule_interval: "@hourly"
start_time: 2023-03-01
status: inactive
settings:
skip_if_previous_running: true
allow_blocks_to_fail: true
envs:
- dev
Doc: https://docs.mage.ai/guides/triggers/configure-triggers-in-code#create-and-configure-triggers
Add indices to schedule models to speed up DB queries.
“Too many open files issue”
ULIMIT_NO_FILE
environment variable to increase maximum number of open files in Mage deployed on AWS, GCP and Azure.Fix git_branch resource blocking page loads. The git clone
command could cause the entire app to hang if the host wasn't added to known hosts. git clone
command is updated to run as a separate process with the timeout, so it won't block the entire app if it's stuck.
Fix bug: when adding a block in between blocks in pipeline with two separate root nodes, the downstream connections are removed.
Fix DBT error: KeyError: 'file_path'
. Check for file_path
 before calling parse_attributes
 method to avoid KeyError.
Improve the coding experience when working with Snowflake data provider credentials. Allow more flexibility in Snowflake SQL block queries. Doc: https://docs.mage.ai/integrations/databases/Snowflake#methods-for-configuring-database-and-schema
Pass parent block’s output and variables to its callback blocks.
Fix missing input field and select field descriptions in charts.
Fix bug: Missing values template chart doesn’t render.
Convert numpy.ndarray
to list
if column type is list when fetching input variables for blocks.
Fix runtime and global variables not available in the keyword arguments when executing block with upstream blocks from the edit pipeline page.
View full Changelog
More complex streaming pipeline is supported in Mage now. You can use more than transformer and more than one sinks in the streaming pipeline.
Here is an example streaming pipeline with multiple transformers and sinks.
Doc for streaming pipeline: https://docs.mage.ai/guides/streaming/overview
Allow using custom Spark configuration to create Spark session used in the pipeline.
spark_config:
# Application name
app_name: 'my spark app'
# Master URL to connect to
# e.g., spark_master: 'spark://host:port', or spark_master: 'yarn'
spark_master: 'local'
# Executor environment variables
# e.g., executor_env: {'PYTHONPATH': '/home/path'}
executor_env: {}
# Jar files to be uploaded to the cluster and added to the classpath
# e.g., spark_jars: ['/home/path/example1.jar']
spark_jars: []
# Path where Spark is installed on worker nodes,
# e.g. spark_home: '/usr/lib/spark'
spark_home: null
# List of key-value pairs to be set in SparkConf
# e.g., others: {'spark.executor.memory': '4g', 'spark.executor.cores': '2'}
others: {}
Doc for running PySpark pipeline: https://docs.mage.ai/integrations/spark-pyspark#standalone-spark-cluster
New data integration source DynamoDB is added.
timestamptz
as data type for datetime column in Postgres destination.Improved the file editor of Mage so that user can edit the files without going into a pipeline.
Mage uses Polars to speed up writing block output (DataFrame) to disk, reducing the time of fetching and writing a DataFrame with 2 million rows from 90s to 15s.
.gitignore
Mage automatically adds the default .gitignore
file when initializing project
.DS_Store
.file_versions
.gitkeep
.log
.logs/
.preferences.yaml
.variables/
__pycache__/
docker-compose.override.yml
logs/
mage-ai.db
mage_data/
secrets/
TypeError: Instance and class checks can only be used with @runtime protocols
.View full Changelog
Add code templates to fetch data from and export data to MongoDB.
Example MongoDB config in io_config.yaml
:
version: 0.1.1
default:
MONGODB_DATABASE: database
MONGODB_HOST: host
MONGODB_PASSWORD: password
MONGODB_PORT: 27017
MONGODB_COLLECTION: collection
MONGODB_USER: user
Data loader template
Data exporter template
renv
for R blockrenv
is installed in Mage docker image by default. User can use renv
package to manage R dependency for your project.
Doc for renv
package: https://cran.r-project.org/web/packages/renv/vignettes/renv.html
Support running streaming pipeline in k8s executor to scale up streaming pipeline execution.
It can be configured in pipeline metadata.yaml with executor_type
field. Here is an example:
blocks:
- ...
- ...
executor_count: 1
executor_type: k8s
name: test_streaming_kafka_kafka
uuid: test_streaming_kafka_kafka
When cancelling the pipeline run in Mage UI, Mage will kill the k8s job.
Support running Spark DBT models in Mage. Currently, only the connection method session
is supported.
Follow this doc to set up Spark environment in Mage. Follow the instructions in https://docs.mage.ai/tutorials/setup-dbt to set up the DBT. Here is an example DBT Spark profiles.yml
spark_demo:
target: dev
outputs:
dev:
type: spark
method: session
schema: default
host: local
Update the multi-development environment to go through the user authentication flow. Multi-development environment is used to manage development instances on cloud.
Doc for multi-development environment: https://docs.mage.ai/developing-in-the-cloud/cloud-dev-environments/overview
Shout out to Joseph Corrado for his contribution of adding pre-commit hooks to Mage to run code checks before committing and pushing the code.
Doc: https://github.com/mage-ai/mage-ai/blob/master/README_dev.md
Shout out to hjhdaniel for his contribution of adding the method for deleting secret keys to Mage.
Example code:
from mage_ai.data_preparation.shared.secrets import delete_secret
delete_secret('secret_name')
If a block is selected in an integration pipeline to retry block runs, only the block runs for the selected block's stream will be ran.
Mage now automatically retries blocks twice on failures (3 total attempts).
Display error popup with link to docs for “too many open files” error.
Fix DBT block limit input field: the limit entered through the UI wasn’t taking effect when previewing the model results. Fix this and set a default limit of 1000.
Fix BigQuery table id issue for batch load.
Fix unique conflict handling for BigQuery batch load.
Remove startup_probe in GCP cloud run executor.
Fix run command for AWS and GCP job runs so that job run logs can be shown in Mage UI correctly.
Pass block configuration to kwargs
in the method.
Fix SQL block execution when using different schemas between upstream block and current block.
View full Changelog
Support using Polars DataFrame in Mage blocks.
Shout out to Sergio Santiago for his contribution of integrating Opsgenie as an alerting option in Mage.
Doc: https://docs.mage.ai/production/observability/alerting-opsgenie
Add support for using batch load jobs instead of the query API in BigQuery destination. You can enable it by setting use_batch_load
to true
in BigQuery destination config.
When loading ~150MB data to BigQuery, using batch loading reduces the time from 1 hour to around 2 minutes (30x the speed).
io_config.yaml
git switch
 to switch branchesAdd another value to DISABLE_NOTEBOOK_EDIT_ACCESS
 environment variable to allow users to create secrets, variables, and run blocks.
The available values are
Doc: https://docs.mage.ai/production/configuring-production-settings/overview#read-only-access
For standard python pipelines, retry block runs from a selected block. The selected block and all downstream blocks will be re-ran after clicking the Retry from selected block
 button.
Fix terminal user authentication. Update terminal authentication to happen on message.
Fix a potential authentication issue for the Google Cloud PubSub publisher client
Dependency graph improvements
DBT
limit
 property in DBT block PUT request payload.Retry pipeline run
Fix bug: When Mage fails to fetch a pipeline due to a backend exception, it doesn't show the actual error. It uses "undefined" in the pipeline url instead, which makes it hard to debug the issue.
Improve job scheduling: If jobs with QUEUED status are not in queue, re-enqueue them.
Pass imagePullSecrets
to k8s job when using k8s
as the executor.
Fix streaming pipeline cancellation.
Fix the version of google-cloud-run package.
Fix query permissions for block resource
Catch sqlalchemy.exc.InternalError
in server and roll back transaction.
View full Changelog
Added Markdown block to Pipeline Editor.
Doc: https://docs.mage.ai/guides/blocks/markdown-blocks
Doc: https://docs.mage.ai/production/data-sync/git#https-token-authentication
Doc: https://docs.mage.ai/development/blocks/callbacks/overview
Make callback block more generic and support it in data integration pipeline.
Keyword arguments available in data integration pipeline callback blocks: https://docs.mage.ai/development/blocks/callbacks/overview#data-integration-pipelines-only
Support bulk retrying pipeline runs for a pipeline.
Add right click context menu for row on pipeline list page for pipeline actions (e.g. rename).
When hovering over left and right vertical navigation, expand it to show navigation title like BigQuery’s UI.
Doc: https://docs.mage.ai/development/testing/great-expectations#json-object
Doc: https://docs.mage.ai/dbt/incremental-models
Shout out to André Ventura for his contribution of adding the Google Cloud Storage destination to data integration pipeline.
Shout out to Dhia Eddine Gharsallaoui again for his contribution of adding Druid data source to Mage.
Doc: https://docs.mage.ai/integrations/databases/Druid
Use COPY
command in mage_ai.io.postgres.Postgres
export method to speed up writing data to Postgres.
Doc: https://docs.mage.ai/guides/streaming/sources/google-cloud-pubsub
Setting the environment variable DEFAULT_EXECUTOR_TYPE
 to k8s
to use K8s executor by default for all pipelines. Doc: https://docs.mage.ai/production/configuring-production-settings/compute-resource#2-set-executor-type-and-customize-the-compute-resource-of-the-mage-executor
Add the k8s_executor_config
 to project’s metadata.yaml to apply the config to all the blocks that use k8s executor in this project. Doc: https://docs.mage.ai/production/configuring-production-settings/compute-resource#kubernetes-executor
Allow specifying GPU resource in k8s_executor_config
.
default
as service account namespace in Helm chartFix service account permission for creating Kubernetes jobs by not using default
namespace.
Doc for deploying with Helm: https://docs.mage.ai/production/deploying-to-cloud/using-helm
MAGE_PUBLIC_HOST
.View full Changelog
Provide code template to trigger another pipeline from a block within a different pipeline.****
Doc: https://docs.mage.ai/orchestration/triggers/trigger-pipeline
Doc: https://docs.mage.ai/guides/streaming/destinations/mongodb
Mage supports two ways to delete messages:
Doc: https://docs.mage.ai/guides/streaming/sources/amazon-sqs#message-deletion-method
Set executor_count
variable in the pipeline’s metadata.yaml file to run multiple executors at the same time to scale the streaming pipeline execution
Doc: https://docs.mage.ai/guides/streaming/overview#run-pipeline-in-production
Added pagination to Triggers and Block Run pages
After pulling the code from git repository to local, automatically install the libraries in requirements.txt
so that the pipelines can run successfully without manual installation of the packages.
Allow setting the table names for upstream blocks when using SQL blocks.
connect_timeout
to PostgreSQL IOlocation
to BigQuery IO.sql
 extension in DBT model name if user includes it (the .sql
 extension should not be included)..sql
 suffix trailing to emphasize that the .sql
 extension should not be included.onSuccess
callback logging issuemage run
command. Set repo_path before initializing the DB so that we can get correct db_connection_url.ModuleNotFoundError: No module named 'aws_secretsmanager_caching'
when running pipeline from command lineView full Changelog
Support using SQL block to fetch data from, transform data in and export data to ClickHouse.
Doc: https://docs.mage.ai/integrations/databases/ClickHouse
Support using SQL block to fetch data from, transform data in and export data to Trino.
Doc: https://docs.mage.ai/development/blocks/sql/trino
Enable Sentry integration to track and monitor exceptions in Sentry dashboard. Doc: https://docs.mage.ai/production/observability/sentry
Mage now supports dragging and dropping blocks to re-order blocks in pipelines.
Support consuming messages from SQS queues in streaming pipelines.
Doc: https://docs.mage.ai/guides/streaming/sources/amazon-sqs
Dummy sink will print the message optionally and discard the message. This dummy sink will be useful when users want to trigger other pipelines or 3rd party services using the ingested data in transformer.
Doc: https://docs.mage.ai/guides/streaming/destinations/dummy
Add code templates to fetch data from and export data to Delta Lake.
Support writing unit tests for Mage pipelines that run in the CI/CD pipeline using mock data.
Doc: https://docs.mage.ai/development/testing/unit-tests
.sql
extension, the wrong file may get deleted if you try to delete the file with the double .sql
extension.+schema
in DBT profileio_config.yaml
database and schema by defaultGit
 feature is used.View full Changelog
In addition to configuring triggers in UI, Mage also supports configuring triggers in code now. Create a triggers.yaml
file under your pipeline folder and enter the triggers config. The triggers will automatically be synced to DB and trigger UI.
Doc: https://docs.mage.ai/guides/triggers/configure-triggers-in-code
Shout out to Dhia Eddine Gharsallaoui for his contribution of centralizing the server loggings and adding verbosity control. User can control the verbosity level of the server logging by setting the SERVER_VERBOSITY
environment variables. For example, you can set SERVER_VERBOSITY
environment variable to ERROR
to only print out errors.
Doc: https://docs.mage.ai/production/observability/logging#server-logging
User can customize the resource when using the Kubernetes executor now by adding the executor_config
to the block config in pipeline’s metadata.yaml
.
Doc: https://docs.mage.ai/production/configuring-production-settings/compute-resource#kubernetes-executor
{}
) when a stream that was previously selected may have been deleted or renamed. If a previously selected stream was deleted or renamed, it will still appear in the SelectStreams
 modal but will automatically be deselected and indicate that the stream is no longer available in red font. User needs to click "Confirm" to remove the deleted stream from the schema.cmd
 shell command for windows instead of bash. Allow users to overwrite the shell command with the SHELL_COMMAND
 environment variable.with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
loader.export(
df,
schema_name,
table_name,
index=False,
if_exists='append',
allow_reserved_words=True,
unique_conflict_method='UPDATE',
unique_constraints=['col'],
)
View full Changelog
The terminal experience is improved in this release, which adds new interactive features and boosts performance. Now, you can use the following interactive commands and more:
git add -p
dbt init demo
great_expectations init
Shout out to Luis SalomĂŁo for adding the Google Ads source.
DOUBLE PRECISION
instead of DECIMAL
as the column type for float/double numbers.Doc: https://docs.mage.ai/guides/streaming/destinations/amazon-s3
Enable the logging of custom exceptions in the transformer of a streaming pipeline. Here is an example code snippet:
@transformer
def transform(messages: List[Dict], *args, **kwargs):
try:
raise Exception('test')
except Exception as err:
kwargs['logger'].error('Test exception', error=err)
return messages
Support cancelling running streaming pipeline (when pipeline is executed in PipelineEditor) after page is refreshed.
Shout out to Tim Ebben for adding the option to send alerts to Google Chat in the same way as Teams/Slack using a webhook.
Example config in project’s metadata.yaml
notification_config:
alert_on:
- trigger_failure
- trigger_passed_sla
slack_config:
webhook_url: ...
How to create webhook url: https://developers.google.com/chat/how-tos/webhooks#create_a_webhook
Prevent a user from editing a pipeline if it’s stale. A pipeline can go stale if there are multiple tabs open trying to edit the same pipeline or multiple people editing the pipeline at different times.
Fix bug: Code block scrolls out of view when focusing on the code block editor area and collapsing/expanding blocks within the code editor.
Fix bug: Sync UI is not updating the "rows processed" value.
Fix the path issue of running dynamic blocks on a Windows server.
Fix index out of range error in data integration transformer when filtering data in the transformer.
Fix issues of loading sample data in Google Sheets.
Fix chart blocks loading data.
Fix Git integration bugs:
Add preventive measures for saving a pipeline:
DBT block
Circular reference detected
issue with DBT variables.SQL block
Add helper for using CRON syntax in trigger setting.
View full Changelog