DZone Research Report: A look at our developer audience, their tech stacks, and topics and tools they're exploring.
Getting Started With Large Language Models: A guide for both novices and seasoned practitioners to unlock the power of language models.
Big data comprises datasets that are massive, varied, complex, and can't be handled traditionally. Big data can include both structured and unstructured data, and it is often stored in data lakes or data warehouses. As organizations grow, big data becomes increasingly more crucial for gathering business insights and analytics. The Big Data Zone contains the resources you need for understanding data storage, data modeling, ELT, ETL, and more.
Non-Volatile Random Access Memory: Key Guidelines for Writing an Efficient NVRAM Algorithm
Big Data Realtime Data Pipeline Architecture
Hybrid data warehouses can both ingest and process data in real-time as streams and store and query this data in table formats. This dual functionality allows for low latency and high throughput in data processing, accommodating both streaming and batch analytics. Examples of such hybrid data warehouses include Apache Druid and Delta Lake. These technologies employ various methods like columnar storage, indexing, caching, and concurrency control to facilitate real-time data warehousing. Nonetheless, depending on their specific implementation and the use case, they may present complexity, reliability, or consistency challenges. As real-time data becomes increasingly critical in data engineering and analytics, choosing an appropriate data warehouse technology hinges on multiple factors. These include the data's volume, velocity, variety, and value, business needs, budget constraints, and available expertise. A thorough understanding of the strengths and limitations of each option can guide you in making a well-informed decision for constructing a robust and efficient data warehouse tailored to your real-time data requirements. What Is Apache Druid? Apache Druid is an open-source analytics database designed for high-performance real-time analytics. It's particularly well-suited for business intelligence (OLAP) queries on event data. Druid is commonly used in environments where real-time insights into large-scale data are crucial, such as e-commerce, financial services, and digital advertising. Key Features of Apache Druid Include: Real-Time Analytics: Druid excels at providing fast analytics on data as it's being ingested, enabling immediate insights into data streams. It offers rapid query execution across distributed systems and high-capacity data ingestion, ensuring low latency. It excels in processing various event data types, including clickstream, IoT data, and event data recorders (such as those used in Tesla vehicles). Scalability: Designed for scalability, it efficiently handles large volumes of data and can be scaled up to meet increased demand. Low Latency: Druid is optimized for low-latency queries, making it ideal for interactive applications where quick response times are critical. High Throughput Ingestion: It can ingest massive amounts of event data with high throughput, making it suitable for applications like clickstream analytics, network monitoring, and fraud detection. Flexible Data Aggregation: It supports quick and flexible data aggregations, essential for summarizing and analyzing large datasets, and facilitates quick data slicing, dicing, and aggregation queries. Distributed Architecture: Its distributed architecture allows for robust fault tolerance and high availability, distributing data and query load across multiple servers. Columnar Storage: It uses a columnar storage format, which enhances performance for analytic queries. Time-Partitioned Data: It boasts a robust architecture featuring time-based sharding, partitioning, column-oriented storage, indexing, data compression, and maintaining versioned, materialized views for high availability. Druid is often chosen for its ability to provide immediate insights, supporting both real-time and batch processing, and its robust scalability, making it a favorable choice for organizations needing to analyze large amounts of event-driven data quickly. Fig 1. Data Analytics Landscape Typical Data ingestion, storage, and data serving layer using Druid: Fig 2. Typical Data ingestion, storage, and data serving layer using Druid How Druid Operates: Its architecture is resilient and scalable, optimized for OLAP (Online Analytical Processing) with data formats designed for efficient analysis. Operations are massively parallelized, ensuring resource-aware processing during query execution and data ingestion. Druid allows for simultaneous ingestion of both batch and real-time data. Support for pre-fetch operations facilitates querying in under a second. Data tiering in Druid allows for the strategic utilization of infrastructure resources. It isolates long-running queries, ensuring they don't interfere with other operations. Key Components of Druid: Coordinator: The Druid Coordinator plays a crucial role in data distribution and management. It is responsible for distributing data into Druid Deep storage. It is also responsible for distributing copies of data to historical nodes, significantly enhancing query responses' efficiency and speed. By ensuring that data is appropriately populated into historical nodes, the Druid Coordinator effectively reduces latency, thereby facilitating high-speed queries. Overlord: The Druid Overlord is a key component in Apache Druid's architecture, primarily responsible for managing and coordinating data ingestion. Its primary functions include: Task Management: The Overlord oversees the assignment and supervision of data ingestion tasks, which can be either real-time or batch. It ensures these tasks are distributed and executed efficiently across the available resources. Scalability: It plays a crucial role in scaling the ingestion process, handling varying loads by dynamically assigning tasks to middle manager nodes. Fault Tolerance: In case of task failures, the Overlord is responsible for detecting these issues and reassigning the tasks to ensure continuous and reliable data ingestion. Load Balancing: The Overlord also manages the load on Druid's middle manager nodes, ensuring an even distribution of tasks for optimal performance. Router: The Druid Router is responsible for receiving queries from clients and directing them to the appropriate query-serving nodes, such as Broker nodes or directly to Historical nodes, depending on the query type and configuration. Broker: The Druid Broker is a critical component of the Apache Druid architecture, focusing on query processing and distribution. When a query is submitted to Druid, the Broker plays the central role in aggregating the results from various data nodes. It sends parts of the query to these nodes and then combines their results to form the final response. The Broker node knows the data segments' locations within the cluster. It routes queries intelligently to the nodes containing the relevant data segments, optimizing the query execution process for efficiency and speed. Brokers can also cache query results, which helps speed up the response time for frequent queries, as it avoids reprocessing the same data repeatedly. In summary, the Druid Broker is pivotal in orchestrating query processing within a Druid cluster, ensuring efficient query execution, result aggregation, and load balancing to optimize the performance and scalability of the system. Historicals: Druid Historical nodes are key components in the Apache Druid architecture, specifically designed for efficient data storage and retrieval. Here are their main characteristics: Single-Threaded Segment Processing: In Druid Historical nodes, each data segment is processed by a single thread. This approach simplifies the processing model and helps in the efficient utilization of system resources for querying and data retrieval. Automatic Tiering: Historical nodes support automatic tiering of data. Data can be categorized into different tiers based on usage or other criteria. This tiering helps optimize the storage and query performance, as frequently accessed data can be placed on faster, more accessible tiers. Data Management by Coordinator: The Druid Coordinator moves data into the appropriate tier within the Historical nodes. It manages data placement and ensures data is stored on the right tier, balancing load and optimizing storage utilization. Memory Mapping: Historical nodes use memory-mapped files for data storage. Memory mapping allows these nodes to leverage the operating system's virtual memory for data management, leading to efficient data access and reduced I/O overhead for queries. In essence, Druid Historical nodes are specialized for reliable and efficient long-term data storage and retrieval, with capabilities like single-threaded processing, automatic tiering, coordinator-led data management, and memory mapping to enhance performance. Middle Manager: The Druid Middle Manager is crucial in Apache Druid's data ingestion process. Druid Middle Managers are pivotal in the data ingestion pipeline of Druid, handling the distribution and execution of ingestion tasks while ensuring scalability and efficient resource management. Data Ingestion Management: Middle Managers are responsible for managing data ingestion into the Druid system. They handle both real-time and batch data ingestion tasks. Task Distribution: Each Middle Manager node can run one or more tasks that ingest data. These tasks are assigned and monitored by the Druid Overlord, who distributes the ingestion workload among available middle managers. Scalability: The architecture of Middle Managers allows for horizontal scalability. As data ingestion demands increase, more Middle Manager nodes can be added to the system to distribute the load effectively. Real-Time Data Processing: In the case of real-time data ingestion, Middle Managers are involved in initial data processing and handoff to Historical nodes for long-term storage. Worker Nodes: Middle Managers act as worker nodes. They execute the tasks assigned by the Overlord, which can include data indexing, processing, and temporary storage. Fig 3. Druid Middle Manager SQL-Based Ingestion (An Example): SQL INSERT INTO tbl SELECT TIME_PARSE("timestamp") AS __time, XXX, YYY, ZZZ FROM TABLE( EXTERN( '{"type": "s3", "uris": ["s3://bucket/file"]}', '{"type": "json"}', '[{"name": "XXX", "type": "string"}, {"name": "YYY", "type": "string"}, {"name": "ZZZ", "type": "string"}, {"name": "timestamp", "type": "string"}]' ) ) PARTITION BY FLOOR(__time TO DAY) CLUSTER BY XXX JSON-Based Ingestion (An Example): Fig 4. Example of JSON-based ingestion Fig 5. Basic functionality of Deep Storage Deep Storage: Deep storage in Apache Druid is a scalable and durable data storage system for permanent data retention. Deep storage in Druid provides a robust, scalable, and durable solution crucial for maintaining data integrity and availability in large-scale data analytics and business intelligence operations. Permanent Storage Layer: Deep storage acts as the primary data repository for Druid, where all the ingested data is stored for long-term retention. This is crucial for ensuring data persistence beyond the lifetime of the individual Druid processes. Support for Various Storage Systems: Druid is designed to be agnostic to the underlying storage system. It can integrate with deep storage solutions like Amazon S3, Google Cloud Storage, Hadoop Distributed File System (HDFS), and Microsoft Azure Storage. Data Segmentation: Data in deep storage is organized into segments, essentially partitioned, compressed, and indexed files. This segmentation aids in efficient data retrieval and querying. Fault Tolerance and Recovery: Deep storage provides the resilience to recover and reload data segments in a system failure. This ensures that data is not lost and can be accessed consistently. Scalability: Deep storage scales independently of the compute resources. As data grows, deep storage can be expanded without impacting the performance of the Druid cluster. Decoupling of Storage and Processing: Druid allows for flexible and cost-effective resource management by separating storage and processing. Compute resources can be scaled up or down as needed, independent of the data volume in deep storage. Data Backup and Archival: Deep storage also serves as a backup and archival solution, ensuring that historical data is preserved and can be accessed for future analysis. Segments in Deep Storage: Segments in deep storage within Apache Druid have distinct characteristics that optimize storage efficiency and query performance. Each segment typically contains between 3 to 5 million rows of data. This size is a balance between granularity for efficient data processing and large enough to ensure good compression and query performance. Data within a segment is partitioned based on time. This time-partitioning is central to Druid's architecture, as it allows for efficient handling and querying of time-series data. Within a segment, data can be clustered by dimension values. This clustering enhances the performance of queries that filter or aggregate data based on these dimensions. Once created, segments are immutable – they do not change. Each segment is versioned, enabling Druid to maintain different versions of the same data. This immutability and versioning are crucial for effective caching, as the cache remains valid until the segment is replaced or updated. Segments in Druid are self-describing, meaning they contain metadata about their structure and schema. This feature is important for schema evolution, as it allows Druid to understand and process segments even when the schema changes over time. These aspects of segment design in Druid are essential for its high-performance analytics capabilities, especially in handling large volumes of time-series data, optimizing query performance, and ensuring data consistency and reliability. Some Key Features of Segments Are Columnar Format: The data in deep storage is stored in a columnar format. This means each column of data is stored separately, enhancing query performance, especially for analytics and aggregation queries, as only the necessary columns need to be read and processed. Dictionary Encoding: Dictionary encoding is used to store data efficiently. It involves creating a unique dictionary of values for a column, where a compact identifier replaces each value. This approach significantly reduces the storage space required for repetitive or similar data. Compressed Representations: Data in segments is compressed to reduce its size in deep storage. Compression reduces the storage cost and speeds up data transfer between storage and processing nodes. Bitmap Indexes: Bitmap indexes are utilized for fast querying, especially for filtering and searching operations. They allow for efficient querying on high-cardinality columns by quickly identifying the rows that match the query criteria. Other Features of Druid: Apache Druid includes additional advanced features that enhance its performance and flexibility in data analytics. These features include: Multiple Levels of Caching Druid implements caching at various levels within its architecture, from the broker to the data nodes. This multi-tiered caching strategy includes: Broker Caching: Caches the results of queries at the broker level, which can significantly speed up response times for repeated queries. Historical Node Caching: Caches data segments in historical nodes, improving query performance on frequently accessed data. Query-Level Caching: Allows caching of partial query results, which can be reused in subsequent queries. Query Lanes and Prioritization Druid supports query planning and prioritization, which are essential for managing and optimizing query workloads. This feature allows administrators to categorize and prioritize queries based on their importance or urgency. For example, critical real-time queries can be prioritized over less urgent batch queries, ensuring that important tasks are completed first. Approximation and Vectorization: Approximation Algorithms: Druid can use various approximation algorithms (like HyperLogLog, Theta Sketches, etc.) to provide faster query responses, especially useful for aggregations and counts over large datasets. These algorithms trade a small amount of accuracy for significant gains in speed and resource efficiency. Vectorization refers to processing data in batches rather than one element at a time. Vectorized query execution allows Druid to perform operations on multiple data points simultaneously, significantly speeding up query performance, especially on modern hardware with SIMD (Single Instruction, Multiple Data) capabilities. Summary: The components and features discussed above make Druid a highly efficient and adaptable system for real-time analytics, capable of handling large volumes of data with varying query workloads while ensuring fast and resource-efficient data processing.
Big data analytics and data science have come a long way in recent years, and as we step into 2024, the landscape is evolving at an unprecedented pace. In this article, we will delve into the exciting trends that are shaping the future of big data analytics. From real-time insights to data governance and the democratization of data, these trends are redefining how organizations leverage their data to gain a competitive edge. Real-Time Data and Insights Accessing real-time data for analysis has become a game-changer across various industries. Gone are the days when making decisions based on historical data was sufficient. Imagine trading Bitcoin based on last week's prices or crafting social media content based on trends from a month ago. Real-time data has already transformed industries like finance and social media, and its applications continue to expand. For example, Walmart has built a massive hybrid cloud infrastructure to manage supply chains and analyze sales in real time. This allows them to react swiftly to market changes and customer demands. Real-time, automated decision-making is becoming the norm, with machine learning and artificial intelligence playing a crucial role. Real-Time, Automated Decision Making Machine learning (ML) and artificial intelligence (AI) are already revolutionizing industries such as healthcare and manufacturing. In healthcare, intelligent systems can detect and diagnose medical conditions, while in manufacturing, AI-driven systems can predict equipment failures and automatically reroute production processes to prevent disruptions. Beyond these examples, we see applications like email marketing software that can automatically determine the winning variant in an A/B test and apply it to other campaigns. The analysis of customer data is also becoming automated, enabling businesses to determine loan eligibility and make informed lending decisions. However, many organizations still retain a manual approval step for critical decisions to ensure oversight and control. Heightened Veracity of Big Data Analytics As the volume of data continues to grow exponentially, ensuring data accuracy and quality is paramount. Bad data can lead to poor decision-making and costly errors. Data analytics tools now possess the capability to identify and flag data anomalies, but businesses must also focus on the integrity of their data pipelines. Understanding the right data sources, analysis methods, and user roles for each use case is essential for maintaining data health and reducing downtime. Data observability platforms, such as Monte Carlo, monitor data freshness, schema, volume, distribution, and lineage, helping organizations maintain high data quality and discoverability. Data Governance With the ever-increasing volume of data, proper data governance becomes crucial. Compliance with regulations like GDPR and CCPA is not only a legal requirement but also essential for protecting a company's reputation. Data breaches can have severe consequences, making data security a top priority. Implementing a data certification program and using data catalogs to outline data usage standards can help ensure data compliance across all departments. By establishing a central set of governance standards, organizations can maintain control over data usage while allowing multiple stakeholders access to data for their specific needs. Storage and Analytics Platforms Cloud technology has revolutionized data storage and processing. Businesses no longer need to worry about physical storage limitations or acquiring additional hardware. Cloud platforms like Snowflake, Redshift, and BigQuery offer virtually infinite storage and processing capabilities. Cloud-based data processing enables multiple stakeholders to access data simultaneously without performance bottlenecks. This accessibility, combined with robust security measures, allows organizations to access up-to-the-minute data from anywhere, facilitating data-driven decision-making. Processing Data Variety With the surge in data volume comes an increase in data variety. Data can originate from various sources, and managing diverse data formats can be challenging. Fortunately, tools like Fivetran provide connectors to over 160 data sources, simplifying data integration. Snowflake's partnerships with services like Qubole bring machine learning and AI capabilities directly into their data platform. This approach allows businesses to work with data from different sources without the need for immediate data consistency. The emphasis is on collating data from various sources and finding ways to use it together effectively. Democratization and Decentralization of Data Traditionally, business analysts relied on in-house data scientists to extract and analyze data. However, the landscape has evolved, with services and tools enabling non-technical users to engage with data. Analytics engineering is gaining prominence, focusing on empowering stakeholders to answer their questions using data. Modern business intelligence tools like Tableau, Mode, and Looker emphasize visual exploration, dashboards, and self-service analytics. The movement to democratize data is in full swing, enabling more individuals within organizations to access and leverage data for decision-making. No-Code Solutions No-code and low-code tools are transforming the big data analytics space by removing the need for coding knowledge. These tools empower stakeholders to work with data without relying on data teams, freeing up data scientists for more complex tasks. No-code solutions promote data-driven decisions throughout the organization, as data engagement becomes accessible to everyone. Microservices and Data Marketplaces Microservices break down monolithic applications into smaller, independently deployable services. This simplifies deployment and makes it easier to extract relevant information. Data can be remixed and reassembled to generate different scenarios, aiding in decision-making. Data marketplaces fill gaps in data or augment existing information. These platforms enable organizations to access additional data sources to enhance their analytics efforts, making data-driven decisions more robust. Data Mesh The concept of a data mesh is gaining traction, particularly in organizations dealing with vast amounts of data. Instead of a monolithic data lake, data mesh decentralizes core components into distributed data products owned independently by cross-functional teams. Empowering these teams to manage and analyze their data fosters a culture of data ownership and collaboration. Data becomes a shared asset, with each team contributing value relevant to its area of the business. Leveraging GenAI and RAG Generative AI (GenAI) and retrieval-augmented generation (RAG) are emerging trends poised to transform big data analytics. GenAI pushes the boundaries of traditional data analysis by generating synthetic datasets and automating content creation. This innovation opens new avenues for predictive analytics and data visualization. RAG enhances AI models by integrating real-time data retrieval, ensuring accurate and contextually relevant insights. Integrating RAG into data systems requires advanced data pipeline architecture skills to support its dynamic nature. The future of big data analytics is characterized by real-time insights, automated decision-making, data quality, governance, cloud scalability, data variety management, democratization, no-code solutions, microservices, data marketplaces, and the data mesh concept. Embracing these trends will empower organizations to unlock the full potential of their data, regardless of their size or budget. The future is bright for those who adapt and harness the power of big data analytics in innovative ways.
We are generating more data every day than ever before. By 2025, it's estimated that 463 exabytes of data will be created each day globally – that's the equivalent of 212,765,957 DVDs. Much of this data is "cold data"; rarely accessed after being stored, yet still needing to be retained for the long term. Managing cold data is becoming a rising challenge as the world's data generation grows exponentially. Traditional storage media like magnetic tape and hard disk drives struggle to provide the longevity, capacity, accessibility, and affordability needed for sustainable cold data storage. But one startup has developed an innovative solution that could be the answer. Cerabyte, a Germany-based startup founded in 2021, has created a nanotechnology storage solution designed specifically for retaining cold data indefinitely. The system uses extremely durable glass-ceramic media that can withstand extreme temperatures, UV radiation, and other environmental hazards. Data is written via near-microscopic laser etching, allowing massive amounts of data to be encoded in a very small space. The Promise of Virtually "Unlimited Lifespan" Unlike magnetic-based storage media, Cerabyte's glass-ceramic disks do not degrade over time. The company promises a "virtually unlimited lifespan," with durability spanning thousands of years. This makes it ideal for cold data that needs to be retained but is rarely accessed once stored. Cerabyte's CEO, Christian Pflaum, stated that their goal is to "store data forever." While traditional storage systems require data to be copied to new disks every few years before the old ones fail, Cerabyte's disks have no shelf life. This provides substantial cost savings over decades of storage. Leveraging Existing Manufacturing for Scalability A key innovation that allows Cerabyte to scale is its use of existing manufacturing pipelines. The glass layers use the same Gorilla Glass that is mass-produced for billions of smartphone displays each year. By tapping into existing infrastructure, Cerabyte avoids the immense upfront investments often needed for new technologies. The prototype Cerabyte demo unit packs one petabyte per rack. However, the company has already validated its roadmap to achieve storage densities comparable to leading disk and tape solutions within a few years. Eventually, capacities up to one exabyte per rack may be possible by incorporating semiconductor fab techniques to scale the laser etching process nanometer by nanometer. Interest Growing From Hyperscalers Hyperscale data centers have already expressed notable interest, according to Cerabyte. Increasing storage densities while reducing costs is hugely impactful for these massive facilities accumulating cold data. Cerabyte promises over 75% cost savings per terabyte compared to HDDs and 50% compared to tape, with practically zero ongoing electricity costs for data retention. Pflaum stated that all leading hyperscalers are aware of Cerabyte's solution. Validation deals are already pending, with announcements expected over the next few months. Meeting the Storage Needs of Science and Medicine For scientific institutions and medical centers generating massive volumes of research data, Cerabyte offers solutions to two critical storage needs — extreme longevity and accessibility. Data from areas like genomics, physics simulations, and space exploration require durable cold storage for 50-100 years. As analytic and computing capabilities improve over time, researchers also want the ability to revisit old datasets. However, the largest scientific organizations say today's tape and optical disk systems restrict them to only keeping 5-10% of data indefinitely due to cost constraints. Cerabyte provides virtually unlimited media lifespan at an equivalent or lower storage dollar per terabyte. This allows full datasets to remain accessible for future analysis instead of selective subset retention. In medical fields like imaging and diagnostics, patient record storage minimums range from 10-30 years, depending on jurisdiction. Rapid retrieval is also critical. Cerabyte's storage densities could potentially allow entire medical histories to be stored in active archives at a regional healthcare system's data center rather than relying on external tape storage. To serve these use cases optimally, Cerabyte is focused on two core milestones: Improving storage density/volume 10-100X over the next 2-3 years to achieve parity with top tape solutions. Optimizing seek/access times to under 10 seconds for random data retrieval from storage libraries of 1000s-10,000s media units. Achieving these benchmarks will go a long way toward getting Cerabyte's glass-based media adopted for the extreme cold storage requirements of scientific and medical organizations at scale in the years ahead. Key Takeaways With the world's data volume growing at staggering rates, new solutions are desperately needed to manage cold data. Startups like Cerabyte are taking revolutionary approaches that could reshape the data center landscape in the years ahead. Here are some of the key innovations that make Cerabyte's system compelling: Media lifespan of thousands of years vs. 5-10 years for current mediums. Taps into existing manufacturing pipelines for low-cost scaling. Laser etching for incredible data density up to one exabyte per rack. 75% cheaper per TB than HDDs and 50% cheaper than tape solutions. Virtually no electricity needed for data retention = massive power savings. For any organization accumulating vast amounts of inactive yet valuable data, scientific research, medical, governmental, hyperscale cloud, Cerabyte may offer the affordable long-durability solution the industry sorely needs. We'll be keeping an eye on this company as they look to make data storage sustainable for the exabyte era ahead.
This tutorial illustrates B2B push-style application integration with APIs and internal integration with messages. We have the following use cases: Ad Hoc Requests for information (Sales, Accounting) that cannot be anticipated in advance. Two Transaction Sources: A) internal Order Entry UI, and B) B2B partner OrderB2B API. The Northwind API Logic Server provides APIs and logic for both transaction sources: Self-Serve APIs to support ad hoc integration and UI dev, providing security (e.g., customers see only their accounts). Order Logic: enforcing database integrity and Application Integration (alert shipping). A custom API to match an agreed-upon format for B2B partners. The Shipping API Logic Server listens to Kafka and processes the message. Key Architectural Requirements: Self-Serve APIs and Shared Logic This sample illustrates some key architectural considerations: Requirement Poor Practice Good Practice Best Practice Ideal Ad Hoc Integration ETL APIs Self-Serve APIs Automated Self-Serve APIs Logic Logic in UI Reusable Logic Declarative Rules.. Extensible with Python Messages Kafka Kafka Logic Integration We'll further expand on these topics as we build the system, but we note some best practices: APIs should be self-serve, not requiring continuing server development. APIs avoid the nightly Extract, Transfer, and Load (ETL) overhead. Logic should be re-used over the UI and API transaction sources. Logic in UI controls is undesirable since it cannot be shared with APIs and messages. Using This Guide This guide was developed with API Logic Server, which is open-source and available here. The guide shows the highlights of creating the system. The complete Tutorial in the Appendix contains detailed instructions to create the entire running system. The information here is abbreviated for clarity. Development Overview This overview shows all the key codes and procedures to create the system above. We'll be using API Logic Server, which consists of a CLI plus a set of runtimes for automating APIs, logic, messaging, and an admin UI. It's an open-source Python project with a standard pip install. 1. ApiLogicServer Create: Instant Project The CLI command below creates an ApiLogicProject by reading your schema. The database is Northwind (Customer, Orders, Items, and Product), as shown in the Appendix. Note: the db_urlvalue is an abbreviation; you normally supply a SQLAlchemy URL. The sample NW SQLite database is included in ApiLogicServer for demonstration purposes. $ ApiLogicServer create --project_name=ApiLogicProject --db_url=nw- The created project is executable; it can be opened in an IDE and executed. One command has created meaningful elements of our system: an API for ad hoc integration and an Admin App. Let's examine these below. API: Ad Hoc Integration The system creates a JSON API with endpoints for each table, providing filtering, sorting, pagination, optimistic locking, and related data access. JSON: APIs are self-serve: consumers can select their attributes and related data, eliminating reliance on custom API development. In this sample, our self-serve API meets our Ad Hoc Integration needs and unblocks Custom UI development. Admin App: Order Entry UI The create command also creates an Admin App: multi-page, multi-table with automatic joins, ready for business user agile collaboration and back office data maintenance. This complements custom UIs you can create with the API. Multi-page navigation controls enable users to explore data and relationships. For example, they might click the first Customer and see their Orders and Items: We created an executable project with one command that completes our ad hoc integration with a self-serve API. 2. Customize: In Your IDE While API/UI automation is a great start, we now require Custom APIs, Logic, and Security. Such customizations are added to your IDE, leveraging all its services for code completion, debugging, etc. Let's examine these. Declare UI Customizations The admin app is not built with complex HTML and JavaScript. Instead, it is configured with the ui/admin/admin.yml, automatically created from your data model by the ApiLogicServer create command. You can customize this file in your IDE to control which fields are shown (including joins), hide/show conditions, help text, etc. This makes it convenient to use the Admin App to enter an Order and OrderDetails: Note the automation for automatic joins (Product Name, not ProductId) and lookups (select from a list of Products to obtain the foreign key). If we attempt to order too much Chai, the transaction properly fails due to the Check Credit logic described below. Check Credit Logic: Multi-Table Derivation and Constraint Rules, 40X More Concise. Such logic (multi-table derivations and constraints) is a significant portion of a system, typically nearly half. API Logic server provides spreadsheet-like rules that dramatically simplify and accelerate logic development. The five check credit rules below represent the same logic as 200 lines of traditional procedural code. Rules are 40X more concise than traditional code, as shown here. Rules are declared in Python and simplified with IDE code completion. Rules can be debugged using standard logging and the debugger: Rules operate by handling SQLAlchemy events, so they apply to all ORM access, whether by the API engine or your custom code. Once declared, you don't need to remember to call them, which promotes quality. The above rules prevented the too-big order with multi-table logic from copying the Product Price, computing the Amount, rolling it up to the AmountTotal and Balance, and checking the credit. These five rules also govern changing orders, deleting them, picking different parts, and about nine automated transactions. Implementing all this by hand would otherwise require about 200 lines of code. Rules are a unique and significant innovation, providing meaningful improvements over procedural logic: CHARACTERISTIC PROCEDURAL DECLARATIVE WHY IT MATTERS Reuse Not Automatic Automatic - all Use Cases 40X Code Reduction Invocation Passive - only if called Active - call not required Quality Ordering Manual Automatic Agile Maintenance Optimizations Manual Automatic Agile Design For more on the rules, click here. Declare Security: Customers See Only Their Own Row Declare row-level security using your IDE to edit logic/declare_security.sh, (see screenshot below). An automatically created admin app enables you to configure roles, users, and user roles. If users now log in as ALFKI (configured with role customer), they see only their customer row. Observe the console log at the bottom shows how the filter worked. Declarative row-level security ensures users see only the rows authorized for their roles. 3. Integrate: B2B and Shipping We now have a running system, an API, logic, security, and a UI. Now, we must integrate with the following: B2B partners: We'll create a B2B Custom Resource. OrderShipping: We add logic to Send an OrderShipping Message. B2B Custom Resource The self-serve API does not conform to the format required for a B2B partnership. We need to create a custom resource. You can create custom resources by editing customize_api.py using standard Python, Flask, and SQLAlchemy. A custom OrderB2B endpoint is shown below. The main task here is to map a B2B payload onto our logic-enabled SQLAlchemy rows. API Logic Server provides a declarative RowDictMapper class you can use as follows: Declare the row/dict mapping; see the OrderB2B class in the lower pane: Note the support for lookup so that partners can send ProductNames, not ProductIds. Create the custom API endpoint; see the upper pane: Add def OrderB2B to customize_api/py to create a new endpoint. Use the OrderB2B class to transform API request data to SQLAlchemy rows (dict_to_row). The automatic commit initiates the shared logic described above to check credit and reorder products. Our custom endpoint required under ten lines of code and the mapper configuration. Produce OrderShipping Message Successful orders must be sent to Shipping in a predesignated format. We could certainly POST an API, but Messaging (here, Kafka) provides significant advantages: Async: Our system will not be impacted if the Shipping system is down. Kafka will save the message and deliver it when Shipping is back up. Multi-cast: We can send a message that multiple systems (e.g., Accounting) can consume. The content of the message is a JSON string, just like an API. Just as you can customize APIs, you can complement rule-based logic using Python events: Declare the mapping; see the OrderShipping class in the right pane. This formats our Kafka message content in the format agreed upon with Shipping. Define an after_flush event, which invokes send_order_to_shipping. This is called by the logic engine, which passes the SQLAlchemy models.Order row. send_order_to_shipping uses OrderShipping.row_to_dict to map our SQLAlchemy order row to a dict and uses the Kafka producer to publish the message. Rule-based logic is customizable with Python, producing a Kafka message with 20 lines of code here. 4. Consume Messages The Shipping system illustrates how to consume messages. The sections below show how to create/start the shipping server create/start and use our IDE to add the consuming logic. Create/Start the Shipping Server This shipping database was created from AI. To simplify matters, API Logic Server has installed the shipping database automatically. We can, therefore, create the project from this database and start it: 1. Create the Shipping Project ApiLogicServer create --project_name=shipping --db_url=shipping 2. Start your IDE (e.g., code shipping) and establish your venv. 3. Start the Shipping Server: F5 (configured to use a different port). The core Shipping system was automated by ChatGPT and ApiLogicServer create. We add 15 lines of code to consume Kafka messages, as shown below. Consuming Logic To consume messages, we enable message consumption, configure a mapping, and provide a message handler as follows. 1. Enable Consumption Shipping is pre-configured to enable message consumption with a setting in config.py: KAFKA_CONSUMER = '{"bootstrap.servers": "localhost:9092", "group.id": "als-default-group1", "auto.offset.reset":"smallest"}' When the server is started, it invokes flask_consumer() (shown below). This is called the pre-supplied FlaskKafka, which handles the Kafka consumption (listening), thread management, and the handle annotation used below. This housekeeping task is pre-created automatically. FlaskKafka was inspired by the work of Nimrod (Kevin) Maina in this project. Many thanks! 2. Configure a Mapping As we did for our OrderB2B Custom Resource, we configured an OrderToShip mapping class to map the message onto our SQLAlchemy Order object. 3. Provide a Consumer Message Handler We provide the order_shipping handler in kafka_consumer.py: Annotate the topic handler method, providing the topic name. This is used by FlaskKafka to establish a Kafka listener Provide the topic handler code, leveraging the mapper noted above. It is called FlaskKafka per the method annotations. Test It You can use your IDE terminal window to simulate a business partner posting a B2BOrder. You can set breakpoints in the code described above to explore system operation. ApiLogicServer curl "'POST' 'http://localhost:5656/api/ServicesEndPoint/OrderB2B'" --data ' {"meta": {"args": {"order": { "AccountId": "ALFKI", "Surname": "Buchanan", "Given": "Steven", "Items": [ { "ProductName": "Chai", "QuantityOrdered": 1 }, { "ProductName": "Chang", "QuantityOrdered": 2 } ] } }}' Use Shipping's Admin App to verify the Order was processed. Summary These applications have demonstrated several types of application integration: Ad Hoc integration via self-serve APIs. Custom integration via custom APIs to support business agreements with B2B partners. Message-based integration to decouple internal systems by reducing dependencies that all systems must always be running. We have also illustrated several technologies noted in the ideal column: Requirement Poor Practice Good Practice Best Practice Ideal Ad Hoc Integration ETL APIs Self-Serve APIs Automated Creation of Self-Serve APIs Logic Logic in UI Reusable Logic Declarative Rules.. Extensible with Python Messages Kafka Kafka Logic Integration API Logic Server provides automation for the ideal practices noted above: 1. Creation: instant ad hoc API (and Admin UI) with the ApiLogicServer create command. 2. Declarative Rules: Security and multi-table logic reduce the backend half of your system by 40X. 3. Kafka Logic Integration Produce messages from logic events. Consume messages by extending kafka_consumer. Services, including: RowDictMapper to transform rows and dict. FlaskKafka for Kafka consumption, threading, and annotation invocation. 4. Standards-based Customization Standard packages: Python, Flask, SQLAlchemy, Kafka... Using standard IDEs. Creation, logic, and integration automation have enabled us to build two non-trivial systems with a remarkably small amount of code: Type Code Custom B2B API 10 lines Check Credit Logic 5 rules Row Level Security 1 security declaration Send Order to Shipping 20 lines Process Order in Shipping 30 lines Mapping configurationsto transform rows and dicts 45 lines Automation dramatically increases time to market, with standards-based customization using your IDE, Python, Flask, SQLAlchemy, and Kafka. For more information on API Logic Server, click here. Appendix Full Tutorial You can recreate this system and explore running code, including Kafka, click here. It should take 30-60 minutes, depending on whether you already have Python and an IDE installed. Sample Database The sample database is an SQLite version of Northwind, Customers, Order, OrderDetail, and Product. To see a database diagram, click here. This database is included when you pip install ApiLogicServer.
Welcome to the first post in our exciting series on mastering offline data pipeline's best practices, focusing on the potent combination of Apache Airflow and data processing engines like Hive and Spark. This post focuses on elevating our data engineering game, streamlining your data workflows, and significantly cutting computing costs. The need to optimize offline data pipeline optimization has become a necessity with the growing complexity and scale of modern data pipelines. In this kickoff post, we delve into the intricacies of Apache Airflow and AWS EMR, a managed cluster platform for big data processing. Working together, they form the backbone of many modern data engineering solutions. However, they can become a source of increased costs and inefficiencies without the right optimization strategies. Let's dive into the journey to transform your data workflows and embrace cost-efficiency in your data engineering environment. Why Focus on Airflow and Apache Hive? Before diving deep into our best practices, let us understand why we focus on the two specific technologies in our post. Airflow, an open-source platform, is a powerful tool for orchestrating complex computational workflows and data processing. On the other hand, AWS EMR (Elastic MapReduce) provides a managed cluster platform that simplifies running big data frameworks. Combined, they offer a robust environment for managing data pipelines but can incur significant costs if not optimized correctly. Apache Hive is widely recognized for its exceptional ability to efficiently manage and query massive datasets in offline data processing and warehousing scenarios. The architecture of Hive is optimized explicitly for batch processing of large data volumes, which is crucial in data warehousing scenarios. Hive is an optimal selection for organizations with significant big data and analytics demands due to its distributed storage and processing capabilities, enabling it to seamlessly handle data at a petabyte scale. Key Configuration Parameters for Apache Hive Jobs Timeouts Purpose: Prevents jobs from running indefinitely. Parameter: execution_timeout Python from datetime import timedelta from airflow.operators.hive_operator import HiveOperator hive_task = HiveOperator(task_id='hive_task' , hql='SELECT * FROM your_table;' , execution_timeout=timedelta(hours=2), ) Retries: Purpose: Handles transient errors by re-attempting the job. The number of retries that should be performed before failing the task. Parameter: retries Python hive_task = HiveOperator(task_id='hive_task', hql='SELECT * FROM your_table;', retries=3, ) Retry Delay Purpose: Sets the delay between retries. Parameter: retry_delay Python from datetime import timedelta hive_task = HiveOperator(task_id='hive_task', hql='SELECT * FROM your_table;', retry_delay=timedelta(minutes=5), ) Retry Exponential Backoff Purpose: allow progressive longer waits between retries by using an exponential backoff algorithm on retry delay (delay will be converted into seconds.) Parameter: retry_exponential_backoff Python from datetime import timedelta hive_task = HiveOperator(task_id='hive_task' , hql='SELECT * FROM your_table;' , retry_delay=timedelta(minutes=5) , retry_exponential_backoff=True ) Task Concurrency Purpose: Limits the number of tasks run simultaneously. Parameter: task_concurrency Python hive_task = HiveOperator(task_id='hive_task', hql='SELECT * FROM your_table;', task_concurrency=5, ) Best Practices for Job Backfilling Offline data pipeline backfilling in Hive, especially for substantial historical data, requires a strategic approach to ensure efficiency and accuracy. Here are some best practices: Incremental Load Strategy: Instead of backfilling all data simultaneously, break the process into smaller, manageable chunks. Incrementally loading data allows for better monitoring, easier error handling, and reduced resource strain. Leverage Hive's Merge Statement: For updating existing records during backfill, use Hive's MERGE statement. It efficiently updates and inserts data based on specific conditions, reducing the complexity of managing upserts. Data Validation and Reconciliation: Post-backfill and validate the data to ensure its integrity. Reconcile the backfilled data against source systems or use checksums to ensure completeness and accuracy. Resource Allocation and Throttling: Carefully plan the resource allocation for the backfill process. Utilize Hive's configuration settings to throttle the resource usage, ensuring it doesn't impact the performance of concurrent jobs. Error Handling and Retry Logic: Implement robust error handling and retry mechanisms. In case of failures, having a well-defined retry logic helps maintain the consistency of backfill operations. Refer to the retry parameters in the section above. Optimize Hive Queries: Use Hive query optimization techniques such as filtering early, minimizing data shuffling, and using appropriate file formats (like ORC or Parquet) for better compression and faster access. Conclusion Optimizing Airflow data pipelines on AWS EMR requires a strategic approach focusing on efficiency and cost-effectiveness. By tuning job parameters, managing retries and timeouts, and adopting best practices for job backfilling, organizations can significantly reduce their AWS EMR computing costs while maintaining high data processing standards. Remember, the key is continuous monitoring and optimization. Data pipeline requirements can change, and what works today might not be the best approach tomorrow. In the next post, we will learn how to fine-tune Spark jobs for optimal performance, job reliability, and cost savings. Stay agile, and keep optimizing.
As much as we say Apache Doris is an all-in-one data platform that is capable of various analytics workloads, it is always compelling to demonstrate that by real use cases. That's why I would like to share this user story with you. It is about how they leverage the capabilities of Apache Doris in reporting, customer tagging, and data lake analytics and achieve high performance. This fintech service provider is a long-term user of Apache Doris. They have almost 10 clusters for production, hundreds of Doris backend nodes, and thousands of CPU Cores. The total data size is near 1 PB. Every day, they have hundreds of workflows running simultaneously, receive almost 10 billion new data records, and respond to millions of data queries. Before migrating to Apache Doris, they used ClickHouse, MySQL, and Elasticsearch. Then, frictions arise from their ever-enlarging data size. They found it hard to scale out the ClickHouse clusters because there were too many dependencies. As for MySQL, they had to switch between various MySQL instances because one MySQL instance had its limits, and cross-instance queries were not supported. Reporting From ClickHouse + MySQL to Apache Doris Data reporting is one of the major services they provide to their customers, and they are bound by an SLA. They used to support such service with a combination of ClickHouse and MySQL, but they found significant fluctuations in their data synchronization duration, making it hard for them to meet the service levels outlined in their SLA. Diagnosis showed that it was because the multiple components add to the complexity and instability of data synchronization tasks. To fix that, they have used Apache Doris as a unified analytic engine to support data reporting. Performance Improvements With Apache Doris, they ingest data via the Broker Load method and reach an SLA compliance rate of over 99% in terms of data synchronization performance. As for data queries, the Doris-based architecture maintains an average query response time of less than 10s and a P90 response time of less than 30s. This is a 50% speedup compared to the old architecture. Tagging Tagging is a common operation in customer analytics. You assign labels to customers based on their behaviors and characteristics so that you can divide them into groups and figure out targeted marketing strategies for each group. In the old processing architecture, where Elasticsearch was the processing engine, raw data was ingested and tagged properly. Then, it will be merged into JSON files and imported into Elasticsearch, which provides data services for analysts and marketers. In this process, the merging step was to reduce updates and relieve load for Elasticsearch, but it turned out to be a troublemaker: Any problematic data in any of the tags could spoil the entire merging operation and thus interrupt the data services. The merging operation was implemented based on Spark and MapReduce and took up to 4 hours. Such a long time frame could encroach on marketing opportunities and lead to unseen losses. Then Apache Doris takes this over. Apache Doris arranges tag data with its data models, which process data fast and smoothly. The aforementioned merging step can be done by the Aggregate Key model, which aggregates tag data based on the specified Aggregate Key upon data ingestion. The Unique Key model is handy for partial column updates. Again, all you need is to specify the Unique Key. This enables swift and flexible data updating and saves you from the trouble of replacing the entire flat table. You can also put your detailed data into a Duplicate model to speed up certain queries. In practice, it took the user 1 hour to finish the data ingestion, compared to 4 hours with the old architecture. In terms of query performance, Doris is equipped with well-developed bitmap indexes and techniques tailored to high-concurrency queries, so in this case, it can finish customer segmentation within seconds and reach over 700 QPS in user-facing queries. Data Lake Analytics In data lake scenarios, the data size you need to handle tends to be huge, but the data processing volume in each query tends to vary. To ensure fast data ingestion and high query performance of huge data sets, you need more resources. On the other hand, during non-peak time, you want to scale down your cluster for more efficient resource management. How do you handle this dilemma? Apache Doris has a few features that are designed for data lake analytics, including Multi-Catalog and Compute Node. The former shields you from the headache of data ingestion in data lake analytics, while the latter enables elastic cluster scaling. The Multi-Catalog mechanism allows you to connect Doris to a variety of external data sources so you can use Doris as a unified query gateway without worrying about bulky data ingestion into Doris. The Compute Node of Apache Doris is a backend role that is designed for remote federated query workloads, like those in data lake analytics. Normal Doris backend nodes are responsible for both SQL query execution and data management, while the Compute Nodes in Doris, as the name implies, only perform computation. Compute Nodes are stateless, making them elastic enough for cluster scaling. The user introduces Compute Nodes into their cluster and deploys them with other components in a hybrid configuration. As a result, the cluster automatically scales down during the night, when there are fewer query requests, and scales out during the daytime to handle the massive query workload. This is more resource-efficient. For easier deployment, they have also optimized their Deploy on Yarn process via Skein. As is shown below, they define the number of Compute nodes and the required resources in the YAML file, and then pack the installation file, configuration file, and startup script into the distributed file system. In this way, they can start or stop the entire cluster of over 100 nodes within minutes using one simple line of code. Conclusion For data reporting and customer tagging, Apache Doris smoothens data ingestion and merging steps and delivers high query performance based on its own design and functionality. For data lake analytics, the user improves resource efficiency by elastic scaling of clusters using the Compute Node. Along their journey with Apache Doris, they have also developed a data ingestion task prioritizing mechanism and contributed it to the Doris project. A gesture to facilitate their use case ends up benefiting the whole open-source community. This is a great example of open-source products thriving on user involvement.
Navigating the dynamic landscape of Analytics and OLAP Systems involves the intricate art of querying and extracting valuable insights from operational databases. Couchbase has recently unveiled Capella Columnar, a cutting-edge addition to its NoSQL Analytics offerings. Capella Columnar goes beyond the conventional, supporting a diverse array of data sources, including MongoDB, DynamoDB, and MySQL. This blog embarks on a fascinating exploration into the integration of Capella Columnar with MongoDB, shining a spotlight on the Link that orchestrates real-time data ingestion from MongoDB collections to Capella Columnar collections. This seamless integration sets the stage for conducting analytical queries on the ingested data, unlocking new capability dimensions within Capella Columnar. To decipher the wealth of data stored in MongoDB collections, creating a Link with MongoDB as the source becomes paramount. The forthcoming section serves as your visual compass, offering an illuminating guide through the effortless process of establishing a Link for MongoDB Atlas on Capella Columnar. This requires minimal effort and promises a smooth and efficient flow of data, elevating your analytical endeavors to new heights. As elucidated in the Link documentation, ensuring seamless integration between Capella Columnar and MongoDB Atlas necessitates careful attention to the following prerequisites. These prerequisites primarily revolve around configuring network access and defining appropriate database role permissions within the MongoDB Atlas environment: Granting Network Access Include the IP address 52.15.156.215 in the IP access list. This process involves whitelisting the mentioned IP in MongoDB for proper network access. IP WhiteListing in Mongo Establishing Minimum Role for Reading MongoDB ChangeStream Generate the necessary role in MongoDB Atlas to ensure adequate permissions for reading the Mongo ChangeStream. Creating a Role in Mongo Atlas Obtain the MongoDB Connection String Retrieve the MongoDB connection string, a crucial element for establishing the connection between Kafka Link and MongoDB Atlas. Steps Involved in Creating Links to MongoDB Create a Capella Columnar Database Navigate to the Capella Columnar tab and initiate the creation of a new database. Create a Datasource Click on "Create a Datasource" to begin setting up the Link. Select the Required SourceDB Choose MongoDB from the list of available databases. Provide MongoDB Source Details Fill in the required details related to MongoDB and the primary key of the target Capella Columnar collection. Collection Name: Create the destination collection where the data will be ingested. Primary Key: Specify the primary key along with the Type for the Capella Columnar collection. Remote Name: Provide the name of the collection on Mongo Atlas. Note: Ensure that the primary key and the associated type are consistent between the Mongo Atlas Collection and the Capella Columnar Collection for proper ingestion. For MongoAtlas, in this illustration, use _id as Primary key, which is of type ObjectId on the Atlas. Time To Connect the Link After providing the necessary details, connect the Link and allow a few minutes for data flow initiation. Monitor Connection State Monitor the connection state, which will transition from DISCONNECTED to CONNECTED once the data starts flowing. Analyze Real-Time Data Changes With the Link connected, changes in the Mongo Collection will be reflected in the Capella Columnar dataset in real-time. Disconnect the Link (Optional) To stop data ingestion, disconnect the Link. This halts the flow of data, but the ingested data remains accessible for analytical queries.By following these steps, you can easily initiate data ingestion in Capella Columnar, empowering you to perform real-time analytics and gain valuable insights from your data. Additional Resources Follow the provided links to learn more about how Capella Columnar addresses specific needs. Read Couchbase Capella Columnar Adds Real-time Data Analytics Service Watch Couchbase Announces New Capella Columnar Service Read the Advantages of Couchbase for real-time analytics and sign up for the preview
Good data quality is one of the most critical requirements in decoupled architectures like microservices or data mesh. Apache Kafka became the de facto standard for these architectures. However, Kafka is a dumb broker that only stores byte arrays. The Schema Registry enforces message structures. This blog post looks at enhancements to leverage data contracts for policies and rules to enforce good data quality on field-level and advanced use cases like routing malicious messages to a dead letter queue. From Point-To-Point and Spaghetti-To-Decoupled Microservices With Apache Kafka Point-to-point HTTP/REST APIs create tightly coupled services. Data lakes and lakehouses enforce a monolithic architecture instead of open-minded data sharing and choosing the best technology for a problem. Hence, Apache Kafka became the de facto standard for microservice and data mesh architectures. Data streaming with Kafka is complementary (not competitive!) to APIs, data lakes/lakehouses, and other data platforms. A scalable and decoupled architecture as a single source of record for high-quality, self-service access to real-time data streams, but also batch and request-response communication. Difference Between Kafka and ETL/ESB/iPaaS Enterprise integration is more challenging than ever before. The IT evolution requires the integration of more and more technologies. Companies deploy applications across the edge, hybrid, and multi-cloud architectures. Point-to-point integration is not good enough. Traditional middleware such as MQ, ETL, and ESB does not scale well enough or only processes data in batch instead of real-time. Integration Platform as a Service (iPaaS) solutions are cloud-native but only allow point-to-point integration. Apache Kafka is the new black for integration projects. Data streaming is a new software category. Domain-Driven Design, Microservices, Data Mesh... The approaches use different principles and best practices. But the reality is that the key to a long-living and flexible enterprise architecture is decoupled, independent applications. However, these applications need to share data in good quality with each other. Apache Kafka shines here. It decouples applications because of its event store. Consumers don't need to know consumers. Domains build independent applications with their own technologies, APIs, and cloud services: Replication between different Kafka clusters enables a global data mesh across data centers and multiple cloud providers or regions. But unfortunately, Apache Kafka itself misses data quality capabilities. That's where the Schema Registry comes into play. The Need for Good Data Quality and Data Governance in Kafka Topics To ensure data quality in a Kafka architecture, organizations need to implement data quality checks, data cleansing, data validation, and monitoring processes. These measures help in identifying and rectifying data quality issues in real-time, ensuring that the data being streamed is reliable, accurate, and consistent. The Need for Good Data Quality in Kafka Messages Data quality is crucial for most Kafka-based data streaming use cases for several reasons: Real-time decision-making: Data streaming involves processing and analyzing data as it is generated. This real-time aspect makes data quality essential because decisions or actions based on faulty or incomplete data can have immediate and significant consequences. Data accuracy: High-quality data ensures that the information being streamed is accurate and reliable. Inaccurate data can lead to incorrect insights, flawed analytics, and poor decision-making. Timeliness: In data streaming, data must be delivered promptly. Poor data quality can result in delays or interruptions in data delivery, affecting the effectiveness of real-time applications. Data consistency: Inconsistent data can lead to confusion and errors in processing. Data streaming systems must ensure that data adheres to a consistent schema and format to enable meaningful and accurate analysis. No matter if a producer or consumer uses real-time data streaming, batch processing, or request-response communication with APIs. Data integration: Data streaming often involves combining data from various sources, such as sensors, databases, and external feeds. High-quality data is essential for seamless integration and for ensuring that data from different sources can be harmonized for analysis. Regulatory compliance: In many industries, compliance with data quality and data governance regulations is mandatory. Failing to maintain data quality in data streaming processes can result in legal and financial repercussions. Cost efficiency: Poor data quality can lead to inefficiencies in data processing and storage. Unnecessary processing of low-quality data can strain computational resources and lead to increased operational costs. Customer satisfaction: Compromised data quality in applications directly impacts customers, it can lead to dissatisfaction, loss of trust, and even attrition. Rules Engine and Policy Enforcement in Kafka Topics With Schema Registry Confluent designed the Schema Registry to manage and store the schemas of data that are shared between different systems in a Kafka-based data streaming environment. Messages from Kafka producers are validated against the schema. The Schema Registry provides a serving layer for your metadata. It provides a RESTful interface for storing and retrieving your Avro, JSON Schema, and Protobuf schemas. It stores a versioned history of all schemas based on a specified subject name strategy, provides multiple compatibility settings, and allows the evolution of schemas according to the configured compatibility settings and expanded support for these schema types. Schema Registry provides serializers that plug into Apache Kafka clients that handle schema storage and retrieval for Kafka messages that are sent in any of the supported formats. Schema Registry is available on GitHub under the Confluent Community License that allows deployment in production scenarios with no licensing costs. It became the de facto standard for ensuring data quality and governance in Kafka projects across all industries. Enforcing the Message Structure as the Foundation of Good Data Quality Confluent Schema Registry enforces message structure by serving as a central repository for schemas in a Kafka-based data streaming ecosystem. Here's how Confluent Schema Registry enforces message structure and rejects invalid messages: Data messages produced by Kafka producers must adhere to the registered schema. A message is rejected if a message doesn't match the schema. This behavior ensures that only well-structured data are published and processed. Schema Registry even supports schema evolution for data interoperability using different schema versions in producers and consumers. Find a detailed explanation and the limitations in the Confluent documentation. Validation of schemas happens on the client side in the Schema Registry. This is not good enough for some scenarios, like regulated markets, where the infrastructure provider cannot trust each data producer. Hence, Confluent's commercial offering added broker-side schema validation. Attribute-Based Policies and Rules in Data Contracts The validation of message schema is a great first step. However, many use cases require schema validation and policy enforcement on the field level, i.e. validating each attribute of the message by itself with custom rules. Welcome to Data Contracts: Disclaimer: The following add-on for Confluent Schema Registry is only available for Confluent Platform and Confluent Cloud. If you use any other Kafka service and schema registry, take this solution as an inspiration for building your data governance suite - or migrate to Confluent :-) Data contracts support various rules, including data quality rules, field-level transformations, event-condition-action rules, and complex schema evolution. Look at the Confluent documentation "Data Contracts for Schema Registry" to learn all the details. Data Contracts and Data Quality Rules for Kafka Messages As described in the Confluent documentation, a data contract specifies and supports the following aspects of an agreement: Structure: This is the part of the contract that is covered by the schema, which defines the fields and their types. Integrity constraints: This includes declarative constraints or data quality rules on the domain values of fields, such as the constraint that age must be a positive integer. Metadata: Metadata is additional information about the schema or its constituent parts, such as whether a field contains sensitive information. Metadata can also include documentation for a data contract, such as who created it. Rules or policies: These data rules or policies can enforce that a field that contains sensitive information must be encrypted, or that a message containing an invalid age must be sent to a dead letter queue. Change or evolution: This implies that data contracts are versioned, and can support declarative migration rules for how to transform data from one version to another so that even changes that would normally break downstream components can be easily accommodated. Example: PII Data Enforcing Encryption and Error-Handling With a Dead Letter Queue One of the built-in rule types is Google Common Expression (CEL), which supports data quality rules. Rules can enforce good data quality or encryption of an attribute like the credit card number: You can also configure advanced routing logic. For instance, error handling: If the expression "size(message. id) == 9" is not validated, then the streaming platform forwards the message to a dead letter queue for further processing with the configuration: "dlq. topic": "bad-data". Dead letter queue (DLQ) is a complex (but very important) topic. Data Contracts as a Foundation for New Data Streaming Products and Integration With Apache Flink Schema Registry should be the foundation of any Kafka project. Data contracts enforce good data quality and interoperability between independent microservices. Each business unit and its data products can choose any technology or API. However, data sharing with others works only with good (enforced) data quality. No matter if you use Confluent Cloud or not, you can learn from the SaaS offering how schemas and data contracts enable data consistency and faster time to market for innovation. Products like Data Catalog, Data Lineage, Confluent Stream Sharing, or the out-of-the-box integration with serverless Apache Flink rely on a good internal data governance strategy with schemas and data contracts. Do you already leverage data contracts in your Confluent environment? If you are not a Confluent user, how do you solve data consistency issues and enforce good data quality? Let’s connect on LinkedIn and discuss it!
SingleStore provides a Change Data Capture (CDC) solution to stream data from MongoDB to SingleStore Kai. In this article, we'll see how to connect an Apache Kafka broker to MongoDB Atlas and then stream the data from MongoDB Atlas to SingleStore Kai using the CDC solution. We'll also use Metabase to create a simple analytics dashboard for SingleStore Kai. The notebook file used in this article is available on GitHub. What Is CDC? CDC is a way to keep track of changes that happen in a database or a system. SingleStore now provides a CDC solution that works with MongoDB. To demonstrate the CDC solution, we'll use a Kafka broker to stream data to a MongoDB Atlas cluster and then use the CDC pipeline to propagate the data from MongoDB Atlas to SingleStore Kai. We'll also create a simple analytics dashboard using Metabase. Figure 1 shows the high-level architecture of our system. Figure 1. High-Level Architecture (Source: SingleStore) We'll focus on other scenarios using the CDC solution in future articles. MongoDB Atlas We'll use MongoDB Atlas in an M0 Sandbox. We'll configure an admin user with atlasAdmin privileges under Database Access. We'll temporarily allow access from anywhere (IP Address 0.0.0.0/0) under Network Access. We'll note down the username, password, and host. Apache Kafka We'll configure a Kafka broker to stream data into MongoDB Atlas. We'll use a Jupyter Notebook to achieve this. First, we'll install some libraries: Shell !pip install pymongo kafka-python --quiet Next, we'll connect to MongoDB Atlas and the Kafka broker: Python from kafka import KafkaConsumer from pymongo import MongoClient try: client = MongoClient("mongodb+srv://<username>:<password>@<host>/?retryWrites=true&w=majority") db = client.adtech print("Connected successfully") except: print("Could not connect") consumer = KafkaConsumer( "ad_events", bootstrap_servers = ["public-kafka.memcompute.com:9092"] ) We'll replace <username>, <password> and <host> with the values that we saved earlier from MongoDB Atlas. Initially, we'll load 100 records into MongoDB Atlas, as follows: Python MAX_ITERATIONS = 100 for iteration, message in enumerate(consumer, start = 1): if iteration > MAX_ITERATIONS: break try: record = message.value.decode("utf-8") user_id, event_name, advertiser, campaign, gender, income, page_url, region, country = map(str.strip, record.split("\t")) events_record = { "user_id": int(user_id), "event_name": event_name, "advertiser": advertiser, "campaign": int(campaign.split()[0]), "gender": gender, "income": income, "page_url": page_url, "region": region, "country": country } db.events.insert_one(events_record) except Exception as e: print(f"Iteration {iteration}: Could not insert data - {str(e)}") The data should load successfully and we should see a database called adtech with a collection called events. Documents in the collection should be similar in structure to the following example: Plain Text _id: ObjectId('64ec906d0e8c0f7bcf72a8ed') user_id: 3857963415 event_name: "Impression" advertiser: "Sherwin-Williams" campaign: 13 gender: "Female" income: "25k and below", page_url: "/2013/02/how-to-make-glitter-valentines-heart-boxes.html/" region: "Michigan" country: "US" These documents represent Ad Campaign events. The events collection stores details of the advertiser, campaign and various demographic information about the user, such as gender and income. SingleStore Kai A previous article showed the steps to create a free SingleStoreDB Cloud account. We'll use the following settings: Workspace Group Name: CDC Demo Group Cloud Provider: AWS Region: US East 1 (N. Virginia) Workspace Name: cdc-demo Size: S-00 Settings:- SingleStore Kai selected Once the workspace is available, we'll make a note of our password and host. The host will be available from CDC Demo Group > Overview > Workspaces > cdc-demo > Connect > Connect Directly > SQL IDE > Host. We'll need this information later for Metabase. We'll also temporarily allow access from anywhere by configuring the firewall under CDC Demo Group > Firewall. From the left navigation pane, we'll select DEVELOP > SQL Editor to create a adtech database and link, as follows: SQL CREATE DATABASE IF NOT EXISTS adtech; USE adtech; DROP LINK adtech.link; CREATE LINK adtech.link AS MONGODB CONFIG '{"mongodb.hosts": "<primary>:27017, <secondary>:27017, <secondary>:27017", "collection.include.list": "adtech.*", "mongodb.ssl.enabled": "true", "mongodb.authsource": "admin", "mongodb.members.auto.discover": "false"}' CREDENTIALS '{"mongodb.user": "<username>", "mongodb.password": "<password>"}'; CREATE TABLES AS INFER PIPELINE AS LOAD DATA LINK adtech.link '*' FORMAT AVRO; We'll replace <username> and <password> with the values that we saved earlier from MongoDB Atlas. We'll also need to replace the values for <primary>, <secondary> and <secondary> with the full address for each from MongoDB Atlas. We'll now check for any tables, as follows: SQL SHOW TABLES; This should show one table called events: Plain Text +------------------+ | Tables_in_adtech | +------------------+ | events | +------------------+ We'll check the structure of the table: SQL DESCRIBE events; The output should be as follows: Plain Text +-------+------+------+------+---------+-------+ | Field | Type | Null | Key | Default | Extra | +-------+------+------+------+---------+-------+ | _id | text | NO | UNI | NULL | | | _more | JSON | NO | | NULL | | +-------+------+------+------+---------+-------+ Next, we'll check for any pipelines: SQL SHOW PIPELINES; This will show one pipeline called events that is currently Stopped: Plain Text +---------------------+---------+-----------+ | Pipelines_in_adtech | State | Scheduled | +---------------------+---------+-----------+ | events | Stopped | False | +---------------------+---------+-----------+ Now we'll start the events pipeline: SQL START ALL PIPELINES; and the state should change to Running: Plain Text +---------------------+---------+-----------+ | Pipelines_in_adtech | State | Scheduled | +---------------------+---------+-----------+ | events | Running | False | +---------------------+---------+-----------+ If we now run the following command: SQL SELECT COUNT(*) FROM events; it should return 100 as the result: Plain Text +----------+ | COUNT(*) | +----------+ | 100 | +----------+ We'll check one row in the events table, as follows: SQL SELECT * FROM events LIMIT 1; The output should be similar to the following: Plain Text +--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | _id | _more | +--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | {"$oid": "64ec906d0e8c0f7bcf72a8f7"} | {"_id":{"$oid":"64ec906d0e8c0f7bcf72a8f7"},"advertiser":"Wendys","campaign":13,"country":"US","event_name":"Click","gender":"Female","income":"75k - 99k","page_url":"/2014/05/flamingo-pop-bridal-shower-collab-with.html","region":"New Mexico","user_id":3857963416} | +--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ The CDC solution has successfully connected to MongoDB Atlas and replicated all 100 records to SingleStore Kai. Let's now create a dashboard using Metabase. Metabase Details of how to install, configure, and create a connection to Metabase were described in a previous article. We'll create visualizations using slight variations of the queries used in the earlier article. 1. Total Number of Events SQL SELECT COUNT(*) FROM events; 2. Events by Region SQL SELECT _more::country AS `events.country`, COUNT(_more::country) AS 'events.countofevents' FROM adtech.events AS events GROUP BY 1; 3. Events by Top 5 Advertisers SQL SELECT _more::advertiser AS `events.advertiser`, COUNT(*) AS `events.count` FROM adtech.events AS events WHERE (_more::advertiser LIKE '%Subway%' OR _more::advertiser LIKE '%McDonals%' OR _more::advertiser LIKE '%Starbucks%' OR _more::advertiser LIKE '%Dollar General%' OR _more::advertiser LIKE '%YUM! Brands%' OR _more::advertiser LIKE '%Dunkin Brands Group%') GROUP BY 1 ORDER BY `events.count` DESC; 4. Ad Visitors by Gender and Income SQL SELECT * FROM (SELECT *, DENSE_RANK() OVER (ORDER BY xx.z___min_rank) AS z___pivot_row_rank, RANK() OVER (PARTITION BY xx.z__pivot_col_rank ORDER BY xx.z___min_rank) AS z__pivot_col_ordering, CASE WHEN xx.z___min_rank = xx.z___rank THEN 1 ELSE 0 END AS z__is_highest_ranked_cell FROM (SELECT *, Min(aa.z___rank) OVER (PARTITION BY aa.`events.income`) AS z___min_rank FROM (SELECT *, RANK() OVER (ORDER BY CASE WHEN bb.z__pivot_col_rank = 1 THEN (CASE WHEN bb.`events.count` IS NOT NULL THEN 0 ELSE 1 END) ELSE 2 END, CASE WHEN bb.z__pivot_col_rank = 1 THEN bb.`events.count` ELSE NULL END DESC, bb.`events.count` DESC, bb.z__pivot_col_rank, bb.`events.income`) AS z___rank FROM (SELECT *, DENSE_RANK() OVER (ORDER BY CASE WHEN ww.`events.gender` IS NULL THEN 1 ELSE 0 END, ww.`events.gender`) AS z__pivot_col_rank FROM (SELECT _more::gender AS `events.gender`, _more::income AS `events.income`, COUNT(*) AS `events.count` FROM adtech.events AS events WHERE (_more::income <> 'unknown' OR _more::income IS NULL) GROUP BY 1, 2) ww) bb WHERE bb.z__pivot_col_rank <= 16384) aa) xx) zz WHERE (zz.z__pivot_col_rank <= 50 OR zz.z__is_highest_ranked_cell = 1) AND (zz.z___pivot_row_rank <= 500 OR zz.z__pivot_col_ordering = 1) ORDER BY zz.z___pivot_row_rank; Figure 2 shows an example of the charts sized and positioned on the AdTech dashboard. We'll set the auto-refresh option to 1 minute. Figure 2. Final Dashboard If we load more data into MongoDB Atlas using the Jupyter notebook by changing MAX_ITERATIONS, we'll see the data propagated to SingleStore Kai and the new data reflected in the AdTech dashboard. Summary In this article, we created a CDC pipeline to augment MongoDB Atlas with SingleStore Kai. SingleStore Kai can be used for analytics due to its far superior performance, as highlighted by several benchmarks. We also used Metabase to create a quick visual dashboard to help us gain insights into our Ad Campaign.
In today's highly competitive landscape, businesses must be able to gather, process, and react to data in real-time in order to survive and thrive. Whether it's detecting fraud, personalizing user experiences, or monitoring systems, near-instant data is now a need, not a nice-to-have. However, building and running mission-critical, real-time data pipelines is challenging. The infrastructure must be fault-tolerant, infinitely scalable, and integrated with various data sources and applications. This is where leveraging Apache Kafka, Python, and cloud platforms comes in handy. In this comprehensive guide, we will cover: An overview of Apache Kafka architecture Running Kafka clusters on the cloud Building real-time data pipelines with Python Scaling processing using PySpark Real-world examples like user activity tracking, IoT data pipeline, and support chat analysis We will include plenty of code snippets, configuration examples, and links to documentation along the way for you to get hands-on experience with these incredibly useful technologies. Let's get started! Apache Kafka Architecture 101 Apache Kafka is a distributed, partitioned, replicated commit log for storing streams of data reliably and at scale. At its core, Kafka provides the following capabilities: Publish-subscribe messaging: Kafka lets you broadcast streams of data like page views, transactions, user events, etc., from producers and consume them in real-time using consumers. Message storage: Kafka durably persists messages on disk as they arrive and retains them for specified periods. Messages are stored and indexed by an offset indicating the position in the log. Fault tolerance: Data is replicated across configurable numbers of servers. If a server goes down, another can ensure continuous operations. Horizontal scalability: Kafka clusters can be elastically scaled by simply adding more servers. This allows for unlimited storage and processing capacity. Kafka architecture consists of the following main components: Topics Messages are published to categories called topics. Each topic acts as a feed or queue of messages. A common scenario is a topic per message type or data stream. Each message in a Kafka topic has a unique identifier called an offset, which represents its position in the topic. A topic can be divided into multiple partitions, which are segments of the topic that can be stored on different brokers. Partitioning allows Kafka to scale and parallelize the data processing by distributing the load among multiple consumers. Producers These are applications that publish messages to Kafka topics. They connect to the Kafka cluster, serialize data (say, to JSON or Avro), assign a key, and send it to the appropriate topic. For example, a web app can produce clickstream events, or a mobile app can produce usage stats. Consumers Consumers read messages from Kafka topics and process them. Processing may involve parsing data, validation, aggregation, filtering, storing to databases, etc. Consumers connect to the Kafka cluster and subscribe to one or more topics to get feeds of messages, which they then handle as per the use case requirements. Brokers This is the Kafka server that receives messages from producers, assigns offsets, commits messages to storage, and serves data to consumers. Kafka clusters consist of multiple brokers for scalability and fault tolerance. ZooKeeper ZooKeeper handles coordination and consensus between brokers like controller election and topic configuration. It maintains cluster state and configuration info required for Kafka operations. This covers Kafka basics. For an in-depth understanding, refer to the excellent Kafka documentation. Now, let's look at simplifying management by running Kafka in the cloud. Kafka in the Cloud While Kafka is highly scalable and reliable, operating it involves significant effort related to deployment, infrastructure management, monitoring, security, failure handling, upgrades, etc. Thankfully, Kafka is now available as a fully managed service from all major cloud providers: Service Description Pricing AWS MSK Fully managed, highly available Apache Kafka clusters on AWS. Handles infrastructure, scaling, security, failure handling etc. Based on number of brokers Google Cloud Pub/Sub Serverless, real-time messaging service based on Kafka. Auto-scaling, at least once delivery guarantees. Based on usage metrics Confluent Cloud Fully managed event streaming platform powered by Apache Kafka. Free tier available. Tiered pricing based on features Azure Event Hubs High throughput event ingestion service for Apache Kafka. Integrations with Azure data services. Based on throughput units The managed services abstract away the complexities of Kafka operations and let you focus on your data pipelines. Next, we will build a real-time pipeline with Python, Kafka, and the cloud. You can also refer to the following guide as another example. Building Real-Time Data Pipelines A basic real-time pipeline with Kafka has two main components: a producer that publishes messages to Kafka and a consumer that subscribes to topics and processes the messages. The architecture follows this flow: We will use the Confluent Kafka Python client library for simplicity. 1. Python Producer The producer application gathers data from sources and publishes it to Kafka topics. As an example, let's say we have a Python service collecting user clickstream events from a web application. In a web application, when a user acts like a page view or product rating, we can capture these events and send them to Kafka. We can abstract the implementation details of how the web app collects the data. Python from confluent_kafka import Producer import json # User event data event = { "timestamp": "2022-01-01T12:22:25", "userid": "user123", "page": "/product123", "action": "view" } # Convert to JSON event_json = json.dumps(event) # Kafka producer configuration conf = { 'bootstrap.servers': 'my_kafka_cluster-xyz.cloud.provider.com:9092', 'client.id': 'clickstream-producer' } # Create producer instance producer = Producer(conf) # Publish event producer.produce(topic='clickstream', value=event_json) # Flush and close producer producer.flush() producer.close() This publishes the event to the clickstream topic on our cloud-hosted Kafka cluster. The confluent_kafka Python client uses an internal buffer to batch messages before sending them to Kafka. This improves efficiency compared to sending each message individually. By default, messages are accumulated in the buffer until either: The buffer size limit is reached (default 32 MB). The flush() method is called. When flush() is called, any messages in the buffer are immediately sent to the Kafka broker. If we did not call flush(), and instead relied on the buffer size limit, there would be a risk of losing events in the event of a failure before the next auto-flush. Calling flush() gives us greater control to minimize potential message loss. However, calling flush() after every production introduces additional overhead. Finding the right buffering configuration depends on our specific reliability needs and throughput requirements. We can keep adding events as they occur to build a live stream. This gives downstream data consumers a continual feed of events. 2. Python Consumer Next, we have a consumer application to ingest events from Kafka and process them. For example, we may want to parse events, filter for a certain subtype, and validate schema. Python from confluent_kafka import Consumer import json # Kafka consumer configuration conf = {'bootstrap.servers': 'my_kafka_cluster-xyz.cloud.provider.com:9092', 'group.id': 'clickstream-processor', 'auto.offset.reset': 'earliest'} # Create consumer instance consumer = Consumer(conf) # Subscribe to 'clickstream' topic consumer.subscribe(['clickstream']) # Poll Kafka for messages infinitely while True: msg = consumer.poll(1.0) if msg is None: continue # Parse JSON from message value event = json.loads(msg.value()) # Process event based on business logic if event['action'] == 'view': print('User viewed product page') elif event['action'] == 'rating': # Validate rating, insert to DB etc pass print(event) # Print event # Close consumer consumer.close() This polls the clickstream topic for new messages, consumes them, and takes action based on the event type - prints, updates database, etc. For a simple pipeline, this works well. But what if we get 100x more events per second? The consumer will not be able to keep up. This is where a tool like PySpark helps scale out processing. 3. Scaling With PySpark PySpark provides a Python API for Apache Spark, a distributed computing framework optimized for large-scale data processing. With PySpark, we can leverage Spark's in-memory computing and parallel execution to consume Kafka streams faster. First, we load Kafka data into a DataFrame, which can be manipulated using Spark SQL or Python. Python from pyspark.sql import SparkSession # Initialize Spark session spark = SparkSession.builder \ .appName('clickstream-consumer') \ .getOrCreate() # Read stream from Kafka 'clickstream' df = spark.readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") \ .option("subscribe", "clickstream") \ .load() # Parse JSON from value df = df.selectExpr("CAST(value AS STRING)") df = df.select(from_json(col("value"), schema).alias("data")) Next, we can express whatever processing logic we need using DataFrame transformations: from pyspark.sql.functions import * # Filter for 'page view' events views = df.filter(col("data.action") == "view") # Count views per page URL counts = views.groupBy(col("data.page")) .count() .orderBy("count") # Print the stream query = counts.writeStream \ .outputMode("complete") \ .format("console") \ .start() query.awaitTermination() This applies operations like filter, aggregate, and sort on the stream in real-time, leveraging Spark's distributed runtime. We can also parallelize consumption using multiple consumer groups and write the output sink to databases, cloud storage, etc. This allows us to build scalable stream processing on data from Kafka. Now that we've covered the end-to-end pipeline let's look at some real-world examples of applying it. Real-World Use Cases Let's explore some practical use cases where these technologies can help process huge amounts of real-time data at scale. User Activity Tracking Many modern web and mobile applications track user actions like page views, button clicks, transactions, etc., to gather usage analytics. Problem Data volumes can scale massively with millions of active users. Need insights in real-time to detect issues and personalize content Want to store aggregate data for historical reporting Solution Ingest clickstream events into Kafka topics using Python or any language. Process using PySpark for cleansing, aggregations, and analytics. Save output to databases like Cassandra for dashboards. Detect anomalies using Spark ML for real-time alerting. IoT Data Pipeline IoT sensors generate massive volumes of real-time telemetry like temperature, pressure, location, etc. Problem Millions of sensor events per second Requires cleaning, transforming, and enriching Need real-time monitoring and historical storage Solution Collect sensor data in Kafka topics using language SDKs. Use PySpark for data wrangling and joining external data. Feed stream into ML models for real-time predictions. Store aggregate data in a time series database for visualization. Customer Support Chat Analysis Chat platforms like Zendesk capture huge amounts of customer support conversations. Problem Millions of chat messages per month Need to understand customer pain points and agent performance Must detect negative sentiment and urgent issues Solution Ingest chat transcripts into Kafka topics using a connector Aggregate and process using PySpark SQL and DataFrames Feed data into NLP models to classify sentiment and intent Store insights into the database for historical reporting Present real-time dashboards for contact center ops This demonstrates applying the technologies to real business problems involving massive, fast-moving data. Learn More To summarize, we looked at how Python, Kafka, and the cloud provide a great combination for building robust, scalable real-time data pipelines.
Miguel Garcia
VP of Engineering,
Nextail Labs
Gautam Goswami
Founder,
DataView
Alexander Eleseev
Full Stack Developer,
First Line Software