DZone Research Report: A look at our developer audience, their tech stacks, and topics and tools they're exploring.
Getting Started With Large Language Models: A guide for both novices and seasoned practitioners to unlock the power of language models.
Programming languages allow us to communicate with computers, and they operate like sets of instructions. There are numerous types of languages, including procedural, functional, object-oriented, and more. Whether you’re looking to learn a new language or trying to find some tips or tricks, the resources in the Languages Zone will give you all the information you need and more.
AI and Rules for Agile Microservices in Minutes
Building a Generative AI Processor in Python
This article is based on this article that describes the AIDocumentLibraryChat project with a RAG-based search service based on the Open AI Embedding/GPT model services. The AIDocumentLibraryChat project has been extended to have the option to use local AI models with the help of Ollama. That has the advantage that the documents never leave the local servers. That is a solution in case it is prohibited to transfer the documents to an external service. Architecture With Ollama, the AI model can run on a local server. That changes the architecture to look like this: The architecture can deploy all needed systems in a local deployment environment that can be controlled by the local organization. An example would be to deploy the AIDocumentLibraryChat application, the PostgreSQL DB, and the Ollama-based AI Model in a local Kubernetes cluster and to provide user access to the AIDocumentLibraryChat with an ingress. With this architecture, only the results are provided by the AIDocumentLibraryChat application and can be accessed by external parties. The system architecture has the UI for the user and the application logic in the AIDocumentLibraryChat application. The application uses Spring AI with the ONNX library functions to create the embeddings of the documents. The embeddings and documents are stored with JDBC in the PostgreSQL database with the vector extension. To create the answers based on the documents/paragraphs content, the Ollama-based model is called with REST. The AIDocumentLibraryChat application, the Postgresql DB, and the Ollama-based model can be packaged in a Docker image and deployed in a Kubernetes cluster. That makes the system independent of external systems. The Ollama models support the needed GPU acceleration on the server. The shell commands to use the Ollama Docker image are in the runOllama.sh file. The shell commands to use the Postgresql DB Docker image with vector extensions are in the runPostgresql.sh file. Building the Application for Ollama The Gradle build of the application has been updated to switch off OpenAI support and switch on Ollama support with the useOllama property: Kotlin plugins { id 'java' id 'org.springframework.boot' version '3.2.1' id 'io.spring.dependency-management' version '1.1.4' } group = 'ch.xxx' version = '0.0.1-SNAPSHOT' java { sourceCompatibility = '21' } repositories { mavenCentral() maven { url "https://repo.spring.io/snapshot" } } dependencies { implementation 'org.springframework.boot:spring-boot-starter-actuator' implementation 'org.springframework.boot:spring-boot-starter-data-jpa' implementation 'org.springframework.boot:spring-boot-starter-security' implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.springframework.ai:spring-ai-tika-document-reader: 0.8.0-SNAPSHOT' implementation 'org.liquibase:liquibase-core' implementation 'net.javacrumbs.shedlock:shedlock-spring:5.2.0' implementation 'net.javacrumbs.shedlock: shedlock-provider-jdbc-template:5.2.0' implementation 'org.springframework.ai: spring-ai-pgvector-store-spring-boot-starter:0.8.0-SNAPSHOT' implementation 'org.springframework.ai: spring-ai-transformers-spring-boot-starter:0.8.0-SNAPSHOT' testImplementation 'org.springframework.boot:spring-boot-starter-test' testImplementation 'org.springframework.security:spring-security-test' testImplementation 'com.tngtech.archunit:archunit-junit5:1.1.0' testRuntimeOnly 'org.junit.platform:junit-platform-launcher' if(project.hasProperty('useOllama')) { implementation 'org.springframework.ai: spring-ai-ollama-spring-boot-starter:0.8.0-SNAPSHOT' } else { implementation 'org.springframework.ai: spring-ai-openai-spring-boot-starter:0.8.0-SNAPSHOT' } } bootJar { archiveFileName = 'aidocumentlibrarychat.jar' } tasks.named('test') { useJUnitPlatform() } The Gradle build adds the Ollama Spring Starter and the Embedding library with 'if(project.hasProperty('useOllama))' statement, and otherwise, it adds the OpenAI Spring Starter. Database Setup The application needs to be started with the Spring Profile 'ollama' to switch on the features needed for Ollama support. The database setup needs a different embedding vector type that is changed with the application-ollama.properties file: Properties files ... spring.liquibase.change-log=classpath:/dbchangelog/db.changelog-master-ollama.xml ... The spring.liquibase.change-log property sets the Liquibase script that includes the Ollama initialization. That script includes the db.changelog-1-ollama.xml script with the initialization: XML <databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.8.xsd"> <changeSet id="8" author="angular2guy"> <modifyDataType tableName="vector_store" columnName="embedding" newDataType="vector(384)"/> </changeSet> </databaseChangeLog> The script changes the column type of the embedding column to vector(384) to support the format that is created by the Spring AI ONNX Embedding library. Add Ollama Support to the Application To support Ollama-based models, the application-ollama.properties file has been added: Properties files spring.ai.ollama.base-url=${OLLAMA-BASE-URL:http://localhost:11434} spring.ai.ollama.model=stable-beluga:13b spring.liquibase.change-log=classpath:/dbchangelog/db.changelog-master-ollama.xml document-token-limit=150 The spring.ai.ollama.base-url property sets the URL to access the Ollama model. The spring.ai.ollama.model sets the name of the model that is run in Ollama. The document-token-limit sets the amount of tokens that the model gets as context from the document/paragraph. The DocumentService has new features to support the Ollama models: Java private final String systemPrompt = "You're assisting with questions about documents in a catalog.\n" + "Use the information from the DOCUMENTS section to provide accurate answers.\n" + "If unsure, simply state that you don't know.\n" + "\n" + "DOCUMENTS:\n" + "{documents}"; private final String ollamaPrompt = "You're assisting with questions about documents in a catalog.\n" + "Use the information from the DOCUMENTS section to provide accurate answers.\n" + "If unsure, simply state that you don't know.\n \n" + " {prompt} \n \n" + "DOCUMENTS:\n" + "{documents}"; @Value("${embedding-token-limit:1000}") private Integer embeddingTokenLimit; @Value("${document-token-limit:1000}") private Integer documentTokenLimit; @Value("${spring.profiles.active:}") private String activeProfile; Ollama supports only system prompts that require a new prompt that includes the user prompt in the {prompt} placeholder. The embeddingTokenLimit and the documentTokenLimit are now set in the application properties and can be adjusted for the different profiles. The activeProfile property gets the space-separated list of the profiles the application was started with. Java public Long storeDocument(Document document) { ... var aiDocuments = tikaDocuments.stream() .flatMap(myDocument1 -> this.splitStringToTokenLimit( myDocument1.getContent(), embeddingTokenLimit).stream() .map(myStr -> new TikaDocumentAndContent(myDocument1, myStr))) .map(myTikaRecord -> new org.springframework.ai.document.Document( myTikaRecord.content(), myTikaRecord.document().getMetadata())) .peek(myDocument1 -> myDocument1.getMetadata().put(ID, myDocument.getId().toString())) .peek(myDocument1 -> myDocument1.getMetadata() .put(MetaData.DATATYPE, MetaData.DataType.DOCUMENT.toString())) .toList(); ... } public AiResult queryDocuments(SearchDto searchDto) { ... Message systemMessage = switch (searchDto.getSearchType()) { case SearchDto.SearchType.DOCUMENT -> this.getSystemMessage(documentChunks, this.documentTokenLimit, searchDto.getSearchString()); case SearchDto.SearchType.PARAGRAPH -> this.getSystemMessage(mostSimilar.stream().toList(), this.documentTokenLimit, searchDto.getSearchString()); ... }; private Message getSystemMessage( String documentStr = this.cutStringToTokenLimit( similarDocuments.stream().map(entry -> entry.getContent()) .filter(myStr -> myStr != null && !myStr.isBlank()) .collect(Collectors.joining("\n")), tokenLimit); SystemPromptTemplate systemPromptTemplate = this.activeProfile .contains("ollama") ? new SystemPromptTemplate(this.ollamaPrompt) : new SystemPromptTemplate(this.systemPrompt); Message systemMessage = systemPromptTemplate.createMessage( Map.of("documents", documentStr, "prompt", prompt)); return systemMessage; } The storeDocument(...) method now uses the embeddingTokenLimit of the properties file to limit the text chunk to create the embedding. The queryDocument(...) method now uses the documentTokenLimit of the properties file to limit the text chunk provided to the model for the generation. The systemPromptTemplate checks the activeProfile property for the ollama profile and creates the SystemPromptTemplate that includes the question. The createMessage(...) method creates the AI Message and replaces the documents and prompt placeholders in the prompt string. Conclusion Spring AI works very well with Ollama. The model used in the Ollama Docker container was stable-beluga:13b. The only difference in the implementation was the changed dependencies and the missing user prompt for the Llama models, but that is a small fix. Spring AI enables very similar implementations for external AI services like OpenAI and local AI services like Ollama-based models. That decouples the Java code from the AI model interfaces very well. The performance of the Ollama models required a decrease of the document-token-limit from 2000 for OpenAI to 150 for Ollama without GPU acceleration. The quality of the AI Model answers has decreased accordingly. To run an Ollama model with parameters that will result in better quality with acceptable response times of the answers, a server with GPU acceleration is required. For commercial/production use a model with an appropriate license is required. That is not the case for the beluga models: the falcon:40b model could be used.
This tutorial illustrates B2B push-style application integration with APIs and internal integration with messages. We have the following use cases: Ad Hoc Requests for information (Sales, Accounting) that cannot be anticipated in advance. Two Transaction Sources: A) internal Order Entry UI, and B) B2B partner OrderB2B API. The Northwind API Logic Server provides APIs and logic for both transaction sources: Self-Serve APIs to support ad hoc integration and UI dev, providing security (e.g., customers see only their accounts). Order Logic: enforcing database integrity and Application Integration (alert shipping). A custom API to match an agreed-upon format for B2B partners. The Shipping API Logic Server listens to Kafka and processes the message. Key Architectural Requirements: Self-Serve APIs and Shared Logic This sample illustrates some key architectural considerations: Requirement Poor Practice Good Practice Best Practice Ideal Ad Hoc Integration ETL APIs Self-Serve APIs Automated Self-Serve APIs Logic Logic in UI Reusable Logic Declarative Rules.. Extensible with Python Messages Kafka Kafka Logic Integration We'll further expand on these topics as we build the system, but we note some best practices: APIs should be self-serve, not requiring continuing server development. APIs avoid the nightly Extract, Transfer, and Load (ETL) overhead. Logic should be re-used over the UI and API transaction sources. Logic in UI controls is undesirable since it cannot be shared with APIs and messages. Using This Guide This guide was developed with API Logic Server, which is open-source and available here. The guide shows the highlights of creating the system. The complete Tutorial in the Appendix contains detailed instructions to create the entire running system. The information here is abbreviated for clarity. Development Overview This overview shows all the key codes and procedures to create the system above. We'll be using API Logic Server, which consists of a CLI plus a set of runtimes for automating APIs, logic, messaging, and an admin UI. It's an open-source Python project with a standard pip install. 1. ApiLogicServer Create: Instant Project The CLI command below creates an ApiLogicProject by reading your schema. The database is Northwind (Customer, Orders, Items, and Product), as shown in the Appendix. Note: the db_urlvalue is an abbreviation; you normally supply a SQLAlchemy URL. The sample NW SQLite database is included in ApiLogicServer for demonstration purposes. $ ApiLogicServer create --project_name=ApiLogicProject --db_url=nw- The created project is executable; it can be opened in an IDE and executed. One command has created meaningful elements of our system: an API for ad hoc integration and an Admin App. Let's examine these below. API: Ad Hoc Integration The system creates a JSON API with endpoints for each table, providing filtering, sorting, pagination, optimistic locking, and related data access. JSON: APIs are self-serve: consumers can select their attributes and related data, eliminating reliance on custom API development. In this sample, our self-serve API meets our Ad Hoc Integration needs and unblocks Custom UI development. Admin App: Order Entry UI The create command also creates an Admin App: multi-page, multi-table with automatic joins, ready for business user agile collaboration and back office data maintenance. This complements custom UIs you can create with the API. Multi-page navigation controls enable users to explore data and relationships. For example, they might click the first Customer and see their Orders and Items: We created an executable project with one command that completes our ad hoc integration with a self-serve API. 2. Customize: In Your IDE While API/UI automation is a great start, we now require Custom APIs, Logic, and Security. Such customizations are added to your IDE, leveraging all its services for code completion, debugging, etc. Let's examine these. Declare UI Customizations The admin app is not built with complex HTML and JavaScript. Instead, it is configured with the ui/admin/admin.yml, automatically created from your data model by the ApiLogicServer create command. You can customize this file in your IDE to control which fields are shown (including joins), hide/show conditions, help text, etc. This makes it convenient to use the Admin App to enter an Order and OrderDetails: Note the automation for automatic joins (Product Name, not ProductId) and lookups (select from a list of Products to obtain the foreign key). If we attempt to order too much Chai, the transaction properly fails due to the Check Credit logic described below. Check Credit Logic: Multi-Table Derivation and Constraint Rules, 40X More Concise. Such logic (multi-table derivations and constraints) is a significant portion of a system, typically nearly half. API Logic server provides spreadsheet-like rules that dramatically simplify and accelerate logic development. The five check credit rules below represent the same logic as 200 lines of traditional procedural code. Rules are 40X more concise than traditional code, as shown here. Rules are declared in Python and simplified with IDE code completion. Rules can be debugged using standard logging and the debugger: Rules operate by handling SQLAlchemy events, so they apply to all ORM access, whether by the API engine or your custom code. Once declared, you don't need to remember to call them, which promotes quality. The above rules prevented the too-big order with multi-table logic from copying the Product Price, computing the Amount, rolling it up to the AmountTotal and Balance, and checking the credit. These five rules also govern changing orders, deleting them, picking different parts, and about nine automated transactions. Implementing all this by hand would otherwise require about 200 lines of code. Rules are a unique and significant innovation, providing meaningful improvements over procedural logic: CHARACTERISTIC PROCEDURAL DECLARATIVE WHY IT MATTERS Reuse Not Automatic Automatic - all Use Cases 40X Code Reduction Invocation Passive - only if called Active - call not required Quality Ordering Manual Automatic Agile Maintenance Optimizations Manual Automatic Agile Design For more on the rules, click here. Declare Security: Customers See Only Their Own Row Declare row-level security using your IDE to edit logic/declare_security.sh, (see screenshot below). An automatically created admin app enables you to configure roles, users, and user roles. If users now log in as ALFKI (configured with role customer), they see only their customer row. Observe the console log at the bottom shows how the filter worked. Declarative row-level security ensures users see only the rows authorized for their roles. 3. Integrate: B2B and Shipping We now have a running system, an API, logic, security, and a UI. Now, we must integrate with the following: B2B partners: We'll create a B2B Custom Resource. OrderShipping: We add logic to Send an OrderShipping Message. B2B Custom Resource The self-serve API does not conform to the format required for a B2B partnership. We need to create a custom resource. You can create custom resources by editing customize_api.py using standard Python, Flask, and SQLAlchemy. A custom OrderB2B endpoint is shown below. The main task here is to map a B2B payload onto our logic-enabled SQLAlchemy rows. API Logic Server provides a declarative RowDictMapper class you can use as follows: Declare the row/dict mapping; see the OrderB2B class in the lower pane: Note the support for lookup so that partners can send ProductNames, not ProductIds. Create the custom API endpoint; see the upper pane: Add def OrderB2B to customize_api/py to create a new endpoint. Use the OrderB2B class to transform API request data to SQLAlchemy rows (dict_to_row). The automatic commit initiates the shared logic described above to check credit and reorder products. Our custom endpoint required under ten lines of code and the mapper configuration. Produce OrderShipping Message Successful orders must be sent to Shipping in a predesignated format. We could certainly POST an API, but Messaging (here, Kafka) provides significant advantages: Async: Our system will not be impacted if the Shipping system is down. Kafka will save the message and deliver it when Shipping is back up. Multi-cast: We can send a message that multiple systems (e.g., Accounting) can consume. The content of the message is a JSON string, just like an API. Just as you can customize APIs, you can complement rule-based logic using Python events: Declare the mapping; see the OrderShipping class in the right pane. This formats our Kafka message content in the format agreed upon with Shipping. Define an after_flush event, which invokes send_order_to_shipping. This is called by the logic engine, which passes the SQLAlchemy models.Order row. send_order_to_shipping uses OrderShipping.row_to_dict to map our SQLAlchemy order row to a dict and uses the Kafka producer to publish the message. Rule-based logic is customizable with Python, producing a Kafka message with 20 lines of code here. 4. Consume Messages The Shipping system illustrates how to consume messages. The sections below show how to create/start the shipping server create/start and use our IDE to add the consuming logic. Create/Start the Shipping Server This shipping database was created from AI. To simplify matters, API Logic Server has installed the shipping database automatically. We can, therefore, create the project from this database and start it: 1. Create the Shipping Project ApiLogicServer create --project_name=shipping --db_url=shipping 2. Start your IDE (e.g., code shipping) and establish your venv. 3. Start the Shipping Server: F5 (configured to use a different port). The core Shipping system was automated by ChatGPT and ApiLogicServer create. We add 15 lines of code to consume Kafka messages, as shown below. Consuming Logic To consume messages, we enable message consumption, configure a mapping, and provide a message handler as follows. 1. Enable Consumption Shipping is pre-configured to enable message consumption with a setting in config.py: KAFKA_CONSUMER = '{"bootstrap.servers": "localhost:9092", "group.id": "als-default-group1", "auto.offset.reset":"smallest"}' When the server is started, it invokes flask_consumer() (shown below). This is called the pre-supplied FlaskKafka, which handles the Kafka consumption (listening), thread management, and the handle annotation used below. This housekeeping task is pre-created automatically. FlaskKafka was inspired by the work of Nimrod (Kevin) Maina in this project. Many thanks! 2. Configure a Mapping As we did for our OrderB2B Custom Resource, we configured an OrderToShip mapping class to map the message onto our SQLAlchemy Order object. 3. Provide a Consumer Message Handler We provide the order_shipping handler in kafka_consumer.py: Annotate the topic handler method, providing the topic name. This is used by FlaskKafka to establish a Kafka listener Provide the topic handler code, leveraging the mapper noted above. It is called FlaskKafka per the method annotations. Test It You can use your IDE terminal window to simulate a business partner posting a B2BOrder. You can set breakpoints in the code described above to explore system operation. ApiLogicServer curl "'POST' 'http://localhost:5656/api/ServicesEndPoint/OrderB2B'" --data ' {"meta": {"args": {"order": { "AccountId": "ALFKI", "Surname": "Buchanan", "Given": "Steven", "Items": [ { "ProductName": "Chai", "QuantityOrdered": 1 }, { "ProductName": "Chang", "QuantityOrdered": 2 } ] } }}' Use Shipping's Admin App to verify the Order was processed. Summary These applications have demonstrated several types of application integration: Ad Hoc integration via self-serve APIs. Custom integration via custom APIs to support business agreements with B2B partners. Message-based integration to decouple internal systems by reducing dependencies that all systems must always be running. We have also illustrated several technologies noted in the ideal column: Requirement Poor Practice Good Practice Best Practice Ideal Ad Hoc Integration ETL APIs Self-Serve APIs Automated Creation of Self-Serve APIs Logic Logic in UI Reusable Logic Declarative Rules.. Extensible with Python Messages Kafka Kafka Logic Integration API Logic Server provides automation for the ideal practices noted above: 1. Creation: instant ad hoc API (and Admin UI) with the ApiLogicServer create command. 2. Declarative Rules: Security and multi-table logic reduce the backend half of your system by 40X. 3. Kafka Logic Integration Produce messages from logic events. Consume messages by extending kafka_consumer. Services, including: RowDictMapper to transform rows and dict. FlaskKafka for Kafka consumption, threading, and annotation invocation. 4. Standards-based Customization Standard packages: Python, Flask, SQLAlchemy, Kafka... Using standard IDEs. Creation, logic, and integration automation have enabled us to build two non-trivial systems with a remarkably small amount of code: Type Code Custom B2B API 10 lines Check Credit Logic 5 rules Row Level Security 1 security declaration Send Order to Shipping 20 lines Process Order in Shipping 30 lines Mapping configurationsto transform rows and dicts 45 lines Automation dramatically increases time to market, with standards-based customization using your IDE, Python, Flask, SQLAlchemy, and Kafka. For more information on API Logic Server, click here. Appendix Full Tutorial You can recreate this system and explore running code, including Kafka, click here. It should take 30-60 minutes, depending on whether you already have Python and an IDE installed. Sample Database The sample database is an SQLite version of Northwind, Customers, Order, OrderDetail, and Product. To see a database diagram, click here. This database is included when you pip install ApiLogicServer.
If you’re not yet familiar with the open-source pgvector extension for PostgreSQL, now’s the time to do so. The tool is extremely helpful for searching text data fast without needing a specialized database to store embeddings. Embeddings represent word similarity and are stored as vectors (a list of numbers). For example, the words “tree” and “bush” are related more closely than “tree” and “automobile.” The open-source pgvector tool makes it possible to search for closely related vectors and find text with the same semantic meaning. This is a major advance for text-based data, and an especially valuable tool for building Large Language Models (LLMs)... and who isn’t right now? By turning PostgreSQL into a high-performance vector store with distance-based embedding search capabilities, pgvector allows users to explore vast textual data easily. This also enables exact nearest neighbor search and approximate nearest neighbor search using L2 (or Euclidian) distance, inner product, and cosine distance. Cosine distance is recommended by OpenAI for capturing semantic similarities efficiently. Using Embeddings in Retrieval Augmented Generation (RAG) and LLMs Embeddings can play a valuable role in the Retrieval Augmented Generation (RAG) process, which is used to fine-tune LLMs on new knowledge. The process includes retrieving relevant information from an external source, transforming it into an LLM digestible format, and then feeding it to the LLM to generate text output. Let’s put an example to it. Searching documentation for answers to technical problems is something I’d bet anyone here has wasted countless hours on. For this example below, using documentation as the source, you can generate embeddings to store in PostgreSQL. When a user queries that documentation, the embeddings make it possible to represent the words in a query as vector numbers, perform a similarity search, and retrieve relevant pieces of the documentation from the database. The user’s query and retrieved documentation are both passed to the LLM, which accurately delivers relevant documentation and sources that answer the query. We tested out pgvector and embeddings using our own documentation at Instaclustr. Here are some example user search phrases to demonstrate how embeddings will plot them relative to one another: “Configure hard drive failure setting in Apache Cassandra” “Change storage settings in Redis” “Enterprise pricing for a 2-year commitment” “Raise a support ticket” “Connect to PostgreSQL using WebSockets” Embeddings plot the first two phases nearest each other, even though they include none of the same words. The LLM Context Window Each LLM has a context window: the number of tokens it can process at once. This can be a challenge, in that models with a limited context window can falter with large inputs, but models trained with large context windows (100,000 tokens, or enough to use a full book in a prompt) suffer from latency and must store that full context in memory. The goal is to use the smallest possible context window that generates useful answers. Embeddings help by making it possible to provide the LLM with only data recognized as relevant so that even an LLM with a tight context window isn’t overwhelmed. Feeding the Embedding Model With LangChain The model that generates embeddings — OpenAI’s text-embedding-ada-002 — has a context window of its own. That makes it essential to break documentation into chunks so this embedding model can digest more easily. The LangChain Python framework offers a solution. An LLM able to answer documentation queries needs these tasks completed first: Document loading: LangChain makes it simple to scrape documentation pages, with the ability to load diverse document formats from a range of locations. Document transformation: Segmenting large documents into smaller digestible chunks enables retrieval of pertinent document sections. Embedding generation: Calculate embeddings for the chunked documentation using OpenAI’s embedding model. Data storing: Store embeddings and original content in PostgreSQL. This process yields the semantic index of documentation we’re after. An Example User Query Workflow Now consider this sample workflow for a user query (sticking with our documentation as the example tested). First, a user submits the question: “How do I create a Redis cluster using Terraform?” OpenAI’s embeddings API calculates the question’s embeddings. The system then queries the semantic index in PostgreSQL using cosine similarity, asking for the original content closest to the embeddings of the user’s question. Finally, the system grabs the original content returned in the vector search, concatenates it together, and includes it in a specially crafted prompt with the user’s original question. Implementing pgvector and a User Interface Now let’s see how we put pgvector into action. First, we enabled the pgvector extension in our PostgreSQL database, and created a table for storing all documents and their embeddings: Python CREATE EXTENSION vector; CREATE TABLE insta_documentation (id bigserial PRIMARY KEY, title, content, url, embedding vector(3)); The following Python code scrapes the documentation, uses Beautiful Soup to extract main text parts such as title and content, and stores them and the URL in the PostgreSQL table: Python urls = [...] def init_connection(): return psycopg2.connect(**st.secrets["postgres"]) def extract_info(url): hdr = {'User-Agent': 'Mozilla/5.0'} req = Request(url,headers=hdr) response = urlopen(req) soup = BeautifulSoup(response, 'html.parser') title = soup.find('title').text middle_section = soup.find('div', class_='documentation-middle').contents # middle section consists of header, content and instaclustr banner and back and forth links - we want only the first two content = str(middle_section[0]) + str(middle_section[1]) return title, content, url conn = init_connection() cursor = conn.cursor() for url in urls: page_content = extract_info(url) postgres_insert_query = """ INSERT INTO insta_documentation (title, content, url) VALUES (%s, %s, %s)""" cursor.execute(postgres_insert_query, page_content) conn.commit() if conn: cursor.close() conn.close() Next, we loaded the documentation pages from the database, divided them into chunks, and created and stored the crucial embeddings. Python def init_connection(): return psycopg2.connect(**st.secrets["postgres"]) conn = init_connection() cursor = conn.cursor() # Define and execute query to the insta_documentation table, limiting to 10 results for testing (creating embeddings through the OpenAI API can get costly when dealing with a huge amount of data) postgres_query = """ SELECT title, content, url FROM insta_documentation LIMIT 10""" cursor.execute(postgres_query) results = cursor.fetchall() conn.commit() # Load results into pandas DataFrame for easier manipulation df = pd.DataFrame(results, columns=['title', 'content', 'url']) # Break down content text which exceed max input token limit into smaller chunk documents # Define text splitter html_splitter = RecursiveCharacterTextSplitter.from_language(language=Language.HTML, chunk_size=1000, chunk_overlap=100) # We need to initialize our embeddings model embeddings = OpenAIEmbeddings(model="text-embedding-ada-002") docs = [] for i in range(len(df.index)): # Create document with metadata for each content chunk docs = docs + html_splitter.create_documents([df['content'][i]], metadatas=[{"title": df['title'][i], "url": df['url'][i]}]) # Create pgvector dataset db = Pgvector.from_documents( embedding=embeddings, documents=docs, collection_name=COLLECTION_NAME, connection_string=CONNECTION_STRING, distance_strategy=DistanceStrategy.COSINE, ) Lastly, the retriever found the correct information to answer a given query. In our test example, we searched our documentation to learn how to sign up for an account: Python query = st.text_input('Your question', placeholder='How can I sign up for an Instaclustr console account?') retriever = store.as_retriever(search_kwargs={"k": 3}) qa = RetrievalQA.from_chain_type( llm=OpenAI(), chain_type="stuff", retriever=retriever, return_source_documents=True, verbose=True, ) result = qa({"query": query}) source_documents = result["source_documents"] document_page_content = [document.page_content for document in source_documents] document_metadata = [document.metadata for document in source_documents] Using Streamlit, a powerful tool for building interactive Python interfaces, we built this interface to test the system and view the successful query results: Data Retrieval With Transformative Efficiency Harnessing PostgreSQL and the open-source pgvector project empowers users to leverage natural language queries to answer questions immediately, with no need to comb through irrelevant data. The result: super accurate, performant, and efficient LLMs, groundbreaking textual capabilities, and meaningful time saved!
Python’s collection module has a feature called ‘Namedtuple’, a ‘Namedtuple’ is a tuple with named elements making code more expressive. Just like dictionaries in Python, ‘Namedtuple’ allows us to access the elements using a member of a tuple rather than an index. Creating a Namedtuple To create a namedtuple we have to use the function ‘namedtuple’ from the collection module. Python from collections import namedtuple # Define a employee tuple that has fields id, name and location. Employee = namedtuple ('Employee', 'id name location') # Create instances of Employee employee1 = Employee (id=10, name='John Doe', location='Atlanta') employee2 = Employee (id=11, name='Mick', location='Dallas') Accessing Elements From Namedtuple 'Namedtuple' provides a dual mechanism for element access. First, elements can be accessed through attribute names and the second mechanism uses traditional numeric indices. Python print(f"{employee1.name} - {employee1.location}") # John Doe - Atlanta print(f"{employee2.name} - {employee2.location}") # Mick – Dallas Elements can be accessed using numeric indices as well. Python print(f"{employee1[1]} - {employee1[2]}") # John Doe - Atlanta print(f"{employee2[1]} - {employee2[2]}") # Mick – Dallas Immutability Immutability is a fundamental property of 'Namedtuples', inherited from regular tuples. It means once the value of the field is set during creation, it cannot be modified. Python try: employee1.name = 'David' except AttributeError as e: print(f"AttributeError: {e}") # AttributeError: can't set attribute Methods 'namedtuple' not only provides a clean and readable way to structure the data but it also some useful methods, these methods enhance the functionality of 'Namedtuple'. a) _asdict(): The _asdict() method returns the named tuple as a dictionary, providing a convenient way to convert 'Namedtuples' into a format that is compatible with other data structures. Python employee1._asdict() # {'id': 10, 'name': 'John Doe', 'location': 'Atlanta'} b) _replace(): The _replace() method creates a new instance of the 'Namedtuple' with specified fields replaced by new values. This method is crucial for maintaining immutability while allowing modifications. Python employee1_modified = employee1._replace(location='DFW') employee1_modified # Employee(id=10, name='John Doe', location='DFW') c) _make(): The _make(iterable) method creates a new instance of the 'namedtuple' from an iterable. For example, we can create a Namedtuple from the list using the _make() method. Python employee_list = [21, 'Bob','Gallup'] Employee._make(employee_list) # Employee(id=21, name='Bob', location='Gallup') Unpacking Namedtuple Through the process of unpacking, Python's 'Namedtuples' enables you to assign their values to individual variables in a single, concise statement. Python id, name, location = employee1 print(f"id: {id}, name: {name}, location:{location}") Transforming 'Namedtuples' into different data structures You can convert a named tuple to a list by using the list() constructor. Here's an example: Python list(employee1) # [10, 'John Doe', 'Atlanta'] You can convert a named tuple to a dictionary using the '_asdict()' method, which returns an OrderedDict that you can convert to a regular dictionary. Here's an example: Python dict(employee1._asdict()) # {'id': 10, 'name': 'John Doe', 'location': 'Atlanta'} Advantages of Using ‘Namedtuple' Readability: ‘Namedtuples’ make code more readable by providing meaningful names to elements, eliminating the need for index-based access. Immutable: Like regular tuples, ‘Namedtuples’ are immutable. Once created, their values cannot be changed. Memory Efficient: ‘Namedtuples’ is memory-efficient, consuming less space compared to equivalent classes. It's important to note that the memory efficiency gained by using Namedtuples is more common in scenarios involving a large number of instances or when dealing with large datasets. Lightweight Data Structures: Ideal for creating simple classes without the need for custom methods. Data Storage: Convenient for storing structured data, especially in scenarios where a full-fledged class is not necessary. APIs and Database Records: Useful for representing records returned from databases or data received from APIs. ‘Namedtuple’ in Python is well-suited for scenarios where you need a simple, immutable data structure with named fields, such as Configuration settings: Use ‘Namedtuple’ to represent configuration settings with named fields for clarity and easy access. Database Records: ‘Namedtuple’ can represent database records, making it clear which field corresponds to which column in a table. Command-Line Parsing: Use ‘Namedtuple’ to store parsed command-line arguments, providing a clear structure for the input parameters. Named Constants: ‘Namedtuple’ can be used to represent named constants in your code, providing a clear and readable way to define constant values. 'Namedtuples' excel in these scenarios by offering clarity, readability, and immutability, making them valuable tools for concisely structuring data.
In this text, the final summary of my categories theory series, I will use my spotlight and take a closer look at the relations between all three previously described functional containers, namely: Functor, Monad, and Applicative. Below, you will find a comparison of them in terms of: Theory Laws Methods Possibilities and use cases Theory Functor In category theory, a Functor represents the mapping between two categories. In software, Functors can be viewed as a util class that allows us to perform a mapping operation over values wrapped in some context. It can also be viewed as an interface proving a certain ability, namely the ability to transform the state of an object. In more functional-focused languages like Scala or Haskell, Functor is a typeclass. There are a few types of Functors. However, their in-depth description and analysis are quite out of the scope of this article, but do not be afraid: I added links to other resources that will help you deepen your knowledge. Functor types: Contravariant Functor Invariant Functor Opposite Functor Bifunctor and Multifunctor There are no good built-in counterparts of Functors in Java and other modern-day programming JVM languages. Nevertheless, they are reasonably easy to implement manually. Probably the best counterpart can be found in the Scala library called Cats or in Haskell. Applicative In category theory, an Applicative, also known as an Applicative Functor, is a concept that generalizes a Functor by allowing for operations not just on the values inside a context but also on functions within a context. It sits between Functors and Monads in terms of expressive power. While less powerful than Monads, Applicatives are more structured than Functors. Going by the book, Applicatives do not allow chain operations in the same ways as Monads do (with the output of one operation being the input of the other). On the other hand, unlike Functors, Applicatives allow us to sequence our computations. Unfortunately, such a relation is quite hard to achieve in the world of software as in the end, the underlay data will end up being used as input for the next. Monad In the world of software, we can view it as a wrapper that puts our value in some context and allows us to perform operations, specifically operations that return results wrapped in the Monadic context, on the value. Monads are mostly used to handle all kinds of side effects: Performing I/O operation Handling operations that can throw exceptions Handling async operations Another important point is that we can chain the operations in such a manner that the output of an operation at any step is the input to the operation at the next step. Such behavior allows us to write very declarative code with minimal boilerplate. Contrary to Functor, Monad has quite a lot of implementations built in modern-day programming languages: Scala (Option, Try, Either, etc.) Haskell (IO, etc.) Java (Optional - not exactly a Monad, but close) From both practical and theoretical points of view, it is also the most powerful of all the three concepts. The complete hierarchy of all three in terms of pure mathematical power looks more or less like this, going from least to most powerful: Laws Functor If we want to implement a Functor, our implementation needs to obey two laws: Identity and Associativity. Identity: Mapping values inside the Functor with the identity function should always return an unchanged value. Associativity: In the chain of function applications, it should not matter how functions are nested. Applicative If we want to implement Applicative, our implementation needs to obey four laws namely: Identity, Homomorphism, Interchange, Composition. Identity: Applying the identity function to a value inside the context should always return an unchanged value. Homomorphism: Applying a function in context to a value in context should give the same result as applying the function to the value and then putting the result in context with the usage of pure. Interchange: Applying the function with context f to a value with context should be the same as applying the wrapped function, which supplies the value as an argument to another function, to the function with context f. Composition: Applying the function with context f and then the function with context g should give the same results as applying functions composition of f and g together within the context. Alternative Applicative There is also an equivalent version of Applicative whose implementation needs to obey three laws: Associativity, Left Identity, and Right Identity. Left identity: If we create a new Applicative and bind it to the function, the result should be the same as applying the function to the value. Right identity: The result of binding a unit function to the Applicative should be the same as the creation of a new Applicative. Associativity: In the chain of function applications, it should not matter how functions are nested. If you would like to take a closer look at the laws of this version of Applicative you can notice that they are the same as the Functor laws. The change in the Identity Law originates from the difference in the setup of both concepts. In particular from the existence of the pure method in Applicative, in a way, we have to check one more case. Monad If we want to implement a Monad, we have to obey three laws: Left Identity, Right Identity, and Associativity. Left identity: If we create a new Monad and bind it to the function, the result should be the same as applying the function to the value. Right identity: The result of binding a unit function to a Monad should be the same as the creation of a new Monad. Associativity: In the chain of function applications, it should not matter how functions are nested. As in the case of Applicative, Monad laws are very similar to Functor laws. What is more, Monad laws are exactly the same as Applicative laws. The only difference is regarding the concept described in the Laws, the condition remains unchanged. The relation between the three, in terms of which concepts extend which, will look more or less like in the picture below. Basically, everything is a Functor. Methods To implement any of the structures, you will need a language with generics support as a parameterized type M<T> is a base for any of them. On the other hand, you can just generate the code for all required types via a macro or some other construct but with generics, it is significantly easier. Functor To implement Functor, you will have to implement only one method: map, you pass a function that operates on value in context. This method should have the following signature M<R> (T -> R). Applicative To implement Applicative you will have to implement two methods: pure, which is used to wrap your value in the Applicative context and has the following signature: M<T>(T). apply (ap): You pass a function with context, and the function then operates on the value in the context. This method should have the following signature: M<U> (M<T -> U>). Alternative Applicative There is also an equivalent version of Applicative where we are using the product instead of apply. product: You pass two different values wrapped in Applicative context and get context with both values in return. This method should have the following signature: M<(T, U)>(M<T>, M<U>). In both approaches to Applicatives, you get a map method with signature M<R> (T -> R) by their definition as all Applicatives are Functors. Monad To implement Monad you will have to implement two methods: unit (of), which is used to wrap the value in Monad context and has the following signature: M<T>(T). bind (flatMap): You pass a function that operates on value in context but the result is already wrapped in Monad context. This method should have the following signature: M<U> (T -> M<U>). What is more, Monad is also an Applicative and, by extension, a Functor. It means that we are getting a lot of additional methods for free. In particular, a map method from Functor and apply or product methods from Applicative. The Possibilities and Use Cases Functor The Functor is just a util method. It does not provide anything more than simple mapping of a value inside an arbitrary context. On the other hand, it is also the simplest and the easiest of all three concepts and can be used almost anywhere where we need to perform operations over the values in the context. Thought Functors still have their limitations. By its definition, Functor is unable to chain computations. What is more, Functor does not provide a way to flatten the result of performed operations, so we can easily end up with nested types like <List<List<Long>>. However, the possibility of performing effective operations without moving them out of context is quite a good thing. Applicative Going further, we have an Applicative with which we can apply functions in context to a value with context. Additionally, with its product version, we can create a tuple out of two objects within the same context. Such behavior can be extremely useful in some cases: we can use it to compose multiple effects to one effect, reducing types like List<Future> to more reasonable Future<List>. That is the reason why Applicatives are a great choice for implementing concepts like parsers, traversables, or composers and any other use case where we need to work with many independent effects of the same type. What is more, because the Applicative is a Functor we can also apply some operations on such output tuples via the map method. Monad Last but not least, we have Monad – the most powerful of all three structures. Each Monad implementation represents an effect: Emptiness (Option) Possible failure (Try) Result or failure of an operation (Either) Chain of computations (IO) What is more, Monad gives us a way to put values in such an effective context. Furthermore, thanks to the flatMap method, we can handle nested return types. This in turn solves the issues with M<M<T>> styled types (Optional<Optional<Long>). Monad implementation will automatically flatten such types to have only one effect – an operation that the map method from Functor is unable to make by its definition. Additionally, with Monads, we can perform contextual operations as we can pass outputs of one operation as inputs to another achieving a nice, declarative chain of computations. Summary Functional containers are a very useful tool that can be applied to a variety of problems. In my opinion, we should find them a place in our engineering toolbox or at least be aware of them. Just please remember that they are not a silver bullet, and making everything a Monad may not be the best way to write your code. If you would like to deepen your knowledge of a particular container, here is the list with my articles describing all three of them in more detail. What Is a Functor? Basic Theory for Java Developers What Is Applicative? Basic Theory for Java Developers What Is a Monad? Basic Theory for a Java Developer As a side note, I would like to add that these containers add another nice touch of math to the world of software engineering, which is another good point to like them, at least in my opinion. Hope that you find my text interesting. Thank you for your time. Review by Mateusz Borek
In today's highly competitive landscape, businesses must be able to gather, process, and react to data in real-time in order to survive and thrive. Whether it's detecting fraud, personalizing user experiences, or monitoring systems, near-instant data is now a need, not a nice-to-have. However, building and running mission-critical, real-time data pipelines is challenging. The infrastructure must be fault-tolerant, infinitely scalable, and integrated with various data sources and applications. This is where leveraging Apache Kafka, Python, and cloud platforms comes in handy. In this comprehensive guide, we will cover: An overview of Apache Kafka architecture Running Kafka clusters on the cloud Building real-time data pipelines with Python Scaling processing using PySpark Real-world examples like user activity tracking, IoT data pipeline, and support chat analysis We will include plenty of code snippets, configuration examples, and links to documentation along the way for you to get hands-on experience with these incredibly useful technologies. Let's get started! Apache Kafka Architecture 101 Apache Kafka is a distributed, partitioned, replicated commit log for storing streams of data reliably and at scale. At its core, Kafka provides the following capabilities: Publish-subscribe messaging: Kafka lets you broadcast streams of data like page views, transactions, user events, etc., from producers and consume them in real-time using consumers. Message storage: Kafka durably persists messages on disk as they arrive and retains them for specified periods. Messages are stored and indexed by an offset indicating the position in the log. Fault tolerance: Data is replicated across configurable numbers of servers. If a server goes down, another can ensure continuous operations. Horizontal scalability: Kafka clusters can be elastically scaled by simply adding more servers. This allows for unlimited storage and processing capacity. Kafka architecture consists of the following main components: Topics Messages are published to categories called topics. Each topic acts as a feed or queue of messages. A common scenario is a topic per message type or data stream. Each message in a Kafka topic has a unique identifier called an offset, which represents its position in the topic. A topic can be divided into multiple partitions, which are segments of the topic that can be stored on different brokers. Partitioning allows Kafka to scale and parallelize the data processing by distributing the load among multiple consumers. Producers These are applications that publish messages to Kafka topics. They connect to the Kafka cluster, serialize data (say, to JSON or Avro), assign a key, and send it to the appropriate topic. For example, a web app can produce clickstream events, or a mobile app can produce usage stats. Consumers Consumers read messages from Kafka topics and process them. Processing may involve parsing data, validation, aggregation, filtering, storing to databases, etc. Consumers connect to the Kafka cluster and subscribe to one or more topics to get feeds of messages, which they then handle as per the use case requirements. Brokers This is the Kafka server that receives messages from producers, assigns offsets, commits messages to storage, and serves data to consumers. Kafka clusters consist of multiple brokers for scalability and fault tolerance. ZooKeeper ZooKeeper handles coordination and consensus between brokers like controller election and topic configuration. It maintains cluster state and configuration info required for Kafka operations. This covers Kafka basics. For an in-depth understanding, refer to the excellent Kafka documentation. Now, let's look at simplifying management by running Kafka in the cloud. Kafka in the Cloud While Kafka is highly scalable and reliable, operating it involves significant effort related to deployment, infrastructure management, monitoring, security, failure handling, upgrades, etc. Thankfully, Kafka is now available as a fully managed service from all major cloud providers: Service Description Pricing AWS MSK Fully managed, highly available Apache Kafka clusters on AWS. Handles infrastructure, scaling, security, failure handling etc. Based on number of brokers Google Cloud Pub/Sub Serverless, real-time messaging service based on Kafka. Auto-scaling, at least once delivery guarantees. Based on usage metrics Confluent Cloud Fully managed event streaming platform powered by Apache Kafka. Free tier available. Tiered pricing based on features Azure Event Hubs High throughput event ingestion service for Apache Kafka. Integrations with Azure data services. Based on throughput units The managed services abstract away the complexities of Kafka operations and let you focus on your data pipelines. Next, we will build a real-time pipeline with Python, Kafka, and the cloud. You can also refer to the following guide as another example. Building Real-Time Data Pipelines A basic real-time pipeline with Kafka has two main components: a producer that publishes messages to Kafka and a consumer that subscribes to topics and processes the messages. The architecture follows this flow: We will use the Confluent Kafka Python client library for simplicity. 1. Python Producer The producer application gathers data from sources and publishes it to Kafka topics. As an example, let's say we have a Python service collecting user clickstream events from a web application. In a web application, when a user acts like a page view or product rating, we can capture these events and send them to Kafka. We can abstract the implementation details of how the web app collects the data. Python from confluent_kafka import Producer import json # User event data event = { "timestamp": "2022-01-01T12:22:25", "userid": "user123", "page": "/product123", "action": "view" } # Convert to JSON event_json = json.dumps(event) # Kafka producer configuration conf = { 'bootstrap.servers': 'my_kafka_cluster-xyz.cloud.provider.com:9092', 'client.id': 'clickstream-producer' } # Create producer instance producer = Producer(conf) # Publish event producer.produce(topic='clickstream', value=event_json) # Flush and close producer producer.flush() producer.close() This publishes the event to the clickstream topic on our cloud-hosted Kafka cluster. The confluent_kafka Python client uses an internal buffer to batch messages before sending them to Kafka. This improves efficiency compared to sending each message individually. By default, messages are accumulated in the buffer until either: The buffer size limit is reached (default 32 MB). The flush() method is called. When flush() is called, any messages in the buffer are immediately sent to the Kafka broker. If we did not call flush(), and instead relied on the buffer size limit, there would be a risk of losing events in the event of a failure before the next auto-flush. Calling flush() gives us greater control to minimize potential message loss. However, calling flush() after every production introduces additional overhead. Finding the right buffering configuration depends on our specific reliability needs and throughput requirements. We can keep adding events as they occur to build a live stream. This gives downstream data consumers a continual feed of events. 2. Python Consumer Next, we have a consumer application to ingest events from Kafka and process them. For example, we may want to parse events, filter for a certain subtype, and validate schema. Python from confluent_kafka import Consumer import json # Kafka consumer configuration conf = {'bootstrap.servers': 'my_kafka_cluster-xyz.cloud.provider.com:9092', 'group.id': 'clickstream-processor', 'auto.offset.reset': 'earliest'} # Create consumer instance consumer = Consumer(conf) # Subscribe to 'clickstream' topic consumer.subscribe(['clickstream']) # Poll Kafka for messages infinitely while True: msg = consumer.poll(1.0) if msg is None: continue # Parse JSON from message value event = json.loads(msg.value()) # Process event based on business logic if event['action'] == 'view': print('User viewed product page') elif event['action'] == 'rating': # Validate rating, insert to DB etc pass print(event) # Print event # Close consumer consumer.close() This polls the clickstream topic for new messages, consumes them, and takes action based on the event type - prints, updates database, etc. For a simple pipeline, this works well. But what if we get 100x more events per second? The consumer will not be able to keep up. This is where a tool like PySpark helps scale out processing. 3. Scaling With PySpark PySpark provides a Python API for Apache Spark, a distributed computing framework optimized for large-scale data processing. With PySpark, we can leverage Spark's in-memory computing and parallel execution to consume Kafka streams faster. First, we load Kafka data into a DataFrame, which can be manipulated using Spark SQL or Python. Python from pyspark.sql import SparkSession # Initialize Spark session spark = SparkSession.builder \ .appName('clickstream-consumer') \ .getOrCreate() # Read stream from Kafka 'clickstream' df = spark.readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") \ .option("subscribe", "clickstream") \ .load() # Parse JSON from value df = df.selectExpr("CAST(value AS STRING)") df = df.select(from_json(col("value"), schema).alias("data")) Next, we can express whatever processing logic we need using DataFrame transformations: from pyspark.sql.functions import * # Filter for 'page view' events views = df.filter(col("data.action") == "view") # Count views per page URL counts = views.groupBy(col("data.page")) .count() .orderBy("count") # Print the stream query = counts.writeStream \ .outputMode("complete") \ .format("console") \ .start() query.awaitTermination() This applies operations like filter, aggregate, and sort on the stream in real-time, leveraging Spark's distributed runtime. We can also parallelize consumption using multiple consumer groups and write the output sink to databases, cloud storage, etc. This allows us to build scalable stream processing on data from Kafka. Now that we've covered the end-to-end pipeline let's look at some real-world examples of applying it. Real-World Use Cases Let's explore some practical use cases where these technologies can help process huge amounts of real-time data at scale. User Activity Tracking Many modern web and mobile applications track user actions like page views, button clicks, transactions, etc., to gather usage analytics. Problem Data volumes can scale massively with millions of active users. Need insights in real-time to detect issues and personalize content Want to store aggregate data for historical reporting Solution Ingest clickstream events into Kafka topics using Python or any language. Process using PySpark for cleansing, aggregations, and analytics. Save output to databases like Cassandra for dashboards. Detect anomalies using Spark ML for real-time alerting. IoT Data Pipeline IoT sensors generate massive volumes of real-time telemetry like temperature, pressure, location, etc. Problem Millions of sensor events per second Requires cleaning, transforming, and enriching Need real-time monitoring and historical storage Solution Collect sensor data in Kafka topics using language SDKs. Use PySpark for data wrangling and joining external data. Feed stream into ML models for real-time predictions. Store aggregate data in a time series database for visualization. Customer Support Chat Analysis Chat platforms like Zendesk capture huge amounts of customer support conversations. Problem Millions of chat messages per month Need to understand customer pain points and agent performance Must detect negative sentiment and urgent issues Solution Ingest chat transcripts into Kafka topics using a connector Aggregate and process using PySpark SQL and DataFrames Feed data into NLP models to classify sentiment and intent Store insights into the database for historical reporting Present real-time dashboards for contact center ops This demonstrates applying the technologies to real business problems involving massive, fast-moving data. Learn More To summarize, we looked at how Python, Kafka, and the cloud provide a great combination for building robust, scalable real-time data pipelines.
In the dynamic world of web development, Single Page Applications (SPAs) and frameworks like React, Angular, and Vue.js have emerged as the preferred approach for delivering seamless user experiences. With the evolution of the Kotlin language and its recent multiplatform capabilities, new options exist that are worthwhile to evaluate. In this article, we will explore Kotlin/JS for creating a web application that communicates with a Spring Boot backend which is also written in Kotlin. In order to keep it as simple as possible, we will not bring in any other framework. Advantages of Kotlin/JS for SPA Development As described in the official documentation, Kotlin/JS provides the ability to transpile Kotlin code, the Kotlin standard library, and any compatible dependencies to JavaScript (ES5). With Kotlin/JS we can manipulate the DOM and create dynamic HTML by taking advantage of Kotlin's conciseness and expressiveness, coupled with its compatibility with JavaScript. And of course, we do have the much needed type-safety, which reduces the likelihood of runtime errors. This enables developers to write client-side code with reduced boilerplate and fewer errors. Additionally, Kotlin/JS seamlessly integrates with popular JavaScript libraries (and frameworks), thus leveraging the extensive ecosystem of existing tools and resources. And, last but not least: this makes it easier for a backend developer to be involved with the frontend part as it looks more familiar. Moderate knowledge of "vanilla" JavaScript, the DOM, and HTML is of course needed; but especially when we are dealing with non-intensive apps (admin panels, back-office sites, etc.), one can get engaged rather smoothly. Sample Project The complete source code for this showcase is available on GitHub. The backend utilizes Spring Security for protecting a simple RESTful API with basic CRUD operations. We won't expand more on this since we want to keep the spotlight on the frontend part which demonstrates the following: Log in with username/password Cookie-based session Page layout with multiple tabs and top navigation bar (based on Bootstrap) Client-side routing (based on Navigo) Table with pagination, sorting, and filtering populated with data fetched from the backend (based on DataTables) Basic form with input fields including (dependent) drop-down lists (based on Bootstrap) Modals and loading masks (based on Bootstrap and spin.js) Usage of sessionStorage and localStorage Usage of Ktor HttpClient for making HTTP calls to the backend An architectural overview is provided in the diagram below: Starting Point The easiest way to start exploring is by creating a new Kotlin Multiplatform project from IntelliJ. The project's template must be "Full-Stack Web Application": This will create the following project structure: springMain: This is the module containing the server-side implementation. springTest: For the Spring Boot tests commonMain: This module contains "shared" code between the frontend and the backend; e.g., DTOs commonTest: For the unit tests of the "common" module jsMain: This is the frontend module responsible for our SPA. jsTest: For the Kotlin/JS tests The sample project on GitHub is based on this particular skeleton. Once you clone the project you may start the backend by executing: $ ./gradlew bootRun This will spin up the SpringBoot app, listening on port: 8090. In order to start the frontend, execute: $ ./gradlew jsBrowserDevelopmentRun -t This will open up a browser window automatically navigating to http://localhost:8080 and presenting the user login page. For convenience, a couple of users are provisioned on the server (have a look at dev.kmandalas.demo.config.SecurityConfig for details). Once logged in, the user views a group of tabs with the main tab presenting a table (data grid) with items fetched from the server. The user can interact with the table (paging, sorting, filtering, data export) and add a new item (product) by pressing the "Add product" button. In this case, a form is presented within a modal with typical input fields and dependent drop-down lists with data fetched from the server. In fact, there is some caching applied on this part in order to reduce network calls. Finally, from the top navigation bar, the user can toggle the theme (this setting is preserved in the browser's local storage) and perform logout. In the next section, we will explore some low-level details for selected parts of the frontend module. The jsMain Module Let's start by having a look at the structure of the module: The naming of the Kotlin files should give an idea about the responsibility of each class. The "entrypoint" is of course the Main.kt class: Kotlin import home.Layout import kotlinx.browser.window import kotlinx.coroutines.MainScope import kotlinx.coroutines.launch fun main() { MainScope().launch { window.onload = { Layout.init() val router = Router() router.start() } } } Once the "index.html" file is loaded, we initialize the Layout and our client-side Router. Now, the "index.html" imports the JavaScript source files of the things we use (Bootstrap, Navigo, Datatables, etc.) and their corresponding CSS files. And of course, it imports the "transpiled" JavaScript file of our Kotlin/JS application. Apart from this, the HTML body part consists of some static parts like the "Top Navbar," and most importantly, our root HTML div tag. Under this tag, we will perform the DOM manipulations needed for our simple SPA. By importing the kotlinx.browser package in our Kotlin classes and singletons, we have access to top-level objects such as the document and window. The standard library provides typesafe wrappers for the functionality exposed by these objects (wherever possible) as described in the Browser and DOM API. So this is what we do at most parts of the module by writing Kotlin and not JavaScript or using jQuery, and at the same time having type-safety without using, e.g., TypeScript. So for example we can create content like this: Kotlin private fun buildTable(products: List<Product>): HTMLTableElement { val table = document.createElement("table") as HTMLTableElement table.className = "table table-striped table-hover" // Header val thead = table.createTHead() val headerRow = thead.insertRow() headerRow.appendChild(document.createElement("th").apply { textContent = "ID" }) headerRow.appendChild(document.createElement("th").apply { textContent = "Name" }) headerRow.appendChild(document.createElement("th").apply { textContent = "Category" }) headerRow.appendChild(document.createElement("th").apply { textContent = "Price" }) // Body val tbody = table.createTBody() for (product in products) { val row = tbody.insertRow() row.appendChild(document.createElement("td").apply { textContent = product.id.toString() }) row.appendChild(document.createElement("td").apply { textContent = product.name }) row.appendChild(document.createElement("td").apply { textContent = product.category.name }) row.appendChild(document.createElement("td").apply { textContent = product.price.toString() }) } document.getElementById("root")?.appendChild(table) return table } Alternatively, we can use the Typesafe HTML DSL of the kotlinx.html library which looks pretty cool. Or we can load HTML content as "templates" and further process them. Seems that many possibilities exist for this task. Moving on, we can attach event-listeners thus dynamic behavior to our UI elements like this: Kotlin categoryDropdown?.addEventListener("change", { val selectedCategory = categoryDropdown.value // Fetch sub-categories based on the selected category mainScope.launch { populateSubCategories(selectedCategory) } }) Before talking about some "exceptions to the rule", it's worth mentioning that we use the Ktor HTTP client (see ProductApi) for making the REST calls to the backend. We could use the ported Fetch API for this task but going with the client looks way better. Of course, we need to add the ktor-client as a dependency to the build.gradle.kts file: Kotlin val jsMain by getting { dependsOn(commonMain) dependencies { implementation("io.ktor:ktor-client-core:$ktorVersion") implementation("io.ktor:ktor-client-js:$ktorVersion") implementation("io.ktor:ktor-client-content-negotiation:$ktorVersion") //... } } The client includes the JSESSIONID browser cookie received from the server upon successful authentication of the HTTP requests. If this is omitted, we will get back HTTP 401/403 errors from the server. These are also handled and displayed within Bootstrap modals. Also, a very convenient thing regarding the client-server communication is the sharing of common data classes (Product.kt and Category.kt, in our case) between the jsMain and springMain modules. Exception 1: Use Dependencies From npm For client-side routing, we selected the Navigo JavaScript library. This library is not part of Kotlin/JS, but we can import it in Gradle using the npm function: Kotlin val jsMain by getting { dependsOn(commonMain) dependencies { //... implementation(npm("navigo", "8.11.1")) } } However, because JavaScript modules are dynamically typed and Kotlin is statically typed, in order to manipulate Navigo from Kotlin we have to provide an "adapter." This is what we do within the Router.kt class: Kotlin @JsModule("navigo") @JsNonModule external class Navigo(root: String, resolveOptions: ResolveOptions = definedExternally) { fun on(route: String, handler: () -> Unit) fun resolve() fun navigate(s: String) } With this in place, the Navigo JavaScript module can be used just like a regular Kotlin class. Exception 2: Use JavaScript Code From Kotlin It is possible to invoke JavaScript functions from Kotlin code using the js() function. Here are some examples from our example project: Kotlin // From ProductTable.kt: private fun initializeDataTable() { js("new DataTable('#$PRODUCTS_TABLE_ID', $DATATABLE_OPTIONS)") } // From ModalUtil.kt: val modalElement = document.getElementById(modal.id) as? HTMLDivElement modalElement?.let { js("new bootstrap.Modal(it).show()") } However, this should be used with caution since this way we are outside Kotlin's type system. Takeaways In general, the best framework to choose depends on several factors with one of the most important ones being, "The one that the developer team is more familiar with." On the other hand, according to Thoughtworks Technology radar, the SPA by default approach is under question; meaning, that we should not blindly accept the complexity of SPAs and their frameworks by default even when the business needs don't justify it. In this article, we provided an introduction to Kotlin multiplatform with Kotlin/JS which brings new things to the table. Taking into consideration the latest additions in the ecosystem - namely Kotlin Wasm and Compose Multiplatform - it becomes evident that these advancements offer not only a fresh perspective but also robust solutions for streamlined development.
Python Flask is a popular framework for building web applications and APIs in Python. It provides developers with a quick and easy way to create RESTful APIs that can be used by other software applications. Flask is lightweight and requires minimal setup, making it a great choice for building small to medium-sized APIs. This makes Flask an ideal choice for developers looking to build robust and scalable APIs in Python. This example will review how to create a simple rest API Flask tutorial. Pre-Requisites Before we start, we must ensure that a couple of prerequisites are completed. To follow along and run this tutorial, you will need to: Install Python Have an IDE to edit your code Install Postman so that you can test the endpoint After all 3 of these prerequisites are completed, we can begin! Creating the Base Project To create the base project, the first thing we will do is create a folder named python-flask-api in your chosen directory. With the new folder created, we will open a terminal in the root of the folder so that commands can be executed to build and run our Python project. Once your terminal is pointed to the root directory of your project, run the following commands so you can initialize the Python Rest API Flask project and manage the dependencies. First, we will use pip to install Flask in the project directory. To do this, run the command below. Shell pip install Flask Writing the Code In our first line of code in the app.py file, we will import the modules for json, Flask, jsonify, and request. Python import json from flask import Flask, jsonify, request Next, we will create a new Flask application by adding the following code just below our import statements. Python app = Flask(__name__) Next, to give our API a little bit of data to work with, we will define an array of employee objects with an ID and name. Python employees = [ { 'id': 1, 'name': 'Ashley' }, { 'id': 2, 'name': 'Kate' }, { 'id': 3, 'name': 'Joe' }] To define our API endpoint, we will now add code to define a route for GET requests to the ‘/employees’ endpoint. This will return all employees (from our employees array defined above) in JSON format. Python @app.route('/employees', methods=['GET']) def get_employees(): return jsonify(employees) On top of our GET method, we will also define a route for POST, PUT, and DELETE methods as well. These functions can be used to create a new employee and update or delete the employee based on their given ID. Python @app.route('/employees', methods=['POST']) def create_employee(): global nextEmployeeId employee = json.loads(request.data) if not employee_is_valid(employee): return jsonify({ 'error': 'Invalid employee properties.' }), 400 employee['id'] = nextEmployeeId nextEmployeeId += 1 employees.append(employee) return '', 201, { 'location': f'/employees/{employee["id"]}' } @app.route('/employees/<int:id>', methods=['PUT']) def update_employee(id: int): employee = get_employee(id) if employee is None: return jsonify({ 'error': 'Employee does not exist.' }), 404 updated_employee = json.loads(request.data) if not employee_is_valid(updated_employee): return jsonify({ 'error': 'Invalid employee properties.' }), 400 employee.update(updated_employee) return jsonify(employee) @app.route('/employees/<int:id>', methods=['DELETE']) def delete_employee(id: int): global employees employee = get_employee(id) if employee is None: return jsonify({ 'error': 'Employee does not exist.' }), 404 employees = [e for e in employees if e['id'] != id] return jsonify(employee), 200 Once our code is complete, it should look like this: Python import json from flask import Flask, jsonify, request app = Flask(__name__) employees = [ { 'id': 1, 'name': 'Ashley' }, { 'id': 2, 'name': 'Kate' }, { 'id': 3, 'name': 'Joe' } ] nextEmployeeId = 4 3 @app.route('/employees', methods=['GET']) def get_employees(): return jsonify(employees) @app.route('/employees/<int:id>', methods=['GET']) def get_employee_by_id(id: int): employee = get_employee(id) if employee is None: return jsonify({ 'error': 'Employee does not exist'}), 404 return jsonify(employee) def get_employee(id): return next((e for e in employees if e['id'] == id), None) def employee_is_valid(employee): for key in employee.keys(): if key != 'name': return False return True @app.route('/employees', methods=['POST']) def create_employee(): global nextEmployeeId employee = json.loads(request.data) if not employee_is_valid(employee): return jsonify({ 'error': 'Invalid employee properties.' }), 400 employee['id'] = nextEmployeeId nextEmployeeId += 1 employees.append(employee) return '', 201, { 'location': f'/employees/{employee["id"]}' } @app.route('/employees/<int:id>', methods=['PUT']) def update_employee(id: int): employee = get_employee(id) if employee is None: return jsonify({ 'error': 'Employee does not exist.' }), 404 updated_employee = json.loads(request.data) if not employee_is_valid(updated_employee): return jsonify({ 'error': 'Invalid employee properties.' }), 400 employee.update(updated_employee) return jsonify(employee) @app.route('/employees/<int:id>', methods=['DELETE']) def delete_employee(id: int): global employees employee = get_employee(id) if employee is None: return jsonify({ 'error': 'Employee does not exist.' }), 404 employees = [e for e in employees if e['id'] != id] return jsonify(employee), 200 if __name__ == '__main__': app.run(port=5000) Lastly, we will add a line of code to run our Flask app. As you can see, we call the run method and get the Flask app running on port 5000. Python if __name__ == '__main__': app.run(port=5000) Running and Testing The Code With our code written and saved, we can start the app up. To run the app, in the terminal, we will execute the following pip command. Shell pip api.py Now, our API is up and running. You can send a test HTTP request through Postman. By sending a request to localhost:5000/employees. After the request is sent. you should see a 200 OK status code in the response along with an array of employees returned. For this test, no request body is needed for the incoming request. Wrapping Up With that, we’ve created a simple RESTful API Flask using Python. This code can then be expanded on as needed to build APIs for your applications.
This article starts with an overview of what a typical computer vision application requires. Then, it introduces Pipeless, an open-source framework that offers a serverless development experience for embedded computer vision. Finally, you will find a detailed step-by-step guide on the creation and execution of a simple object detection app with just a couple of Python functions and a model. Inside a Computer Vision Application "The art of identifying visual events via a camera interface and reacting to them" That is what I would answer if someone asked me to describe what computer vision is in one sentence. But it is probably not what you want to hear. So let's dive into how computer vision applications are typically structured and what is required in each subsystem. Really fast frame processing: Note that to process a stream of 60 FPS in real-time, you only have 16 ms to process each frame. This is achieved, in part, via multi-threading and multi-processing. In many cases, you want to start processing a frame even before the previous one has finished. An AI model to run inference on each frame and perform object detection, segmentation, pose estimation, etc: Luckily, there are more and more open-source models that perform pretty well, so we don't have to create our own from scratch, you usually just fine-tune the parameters of a model to match your use case (we will not deep dive into this today). An inference runtime: The inference runtime takes care of loading the model and running it efficiently on the different available devices (GPUs or CPUs). A GPU: To run the inference using the model fast enough, we require a GPU. This happens because GPUs can handle orders of magnitude more parallel operations than a CPU, and a model at the lowest level is just a huge bunch of mathematical operations. You will need to deal with the memory where the frames are located. They can be at the GPU memory or at the CPU memory (RAM) and copying frames between those is a very heavy operation due to the frame sizes that will make your processing slow. Multimedia pipelines: These are the pieces that allow you to take streams from sources, split them into frames, provide them as input to the models, and, sometimes, make modifications and rebuild the stream to forward it. Stream management: You may want to make the application resistant to interruptions in the stream, re-connections, adding and removing streams dynamically, processing several of them at the same time, etc. All those systems need to be created or incorporated into your project and thus, it is code that you need to maintain. The problem is that you end up maintaining a huge amount of code that is not specific to your application, but subsystems around the actual case-specific code. The Pipeless Framework To avoid having to build all the above from scratch, you can use Pipeless. It is an open-source framework for computer vision that allows you to provide a few functions specific to your case and it takes care of everything else. Pipeless splits the application's logic into "stages," where a stage is like a micro app for a single model. A stage can include pre-processing, running inference with the pre-processed input, and post-processing the model output to take any action. Then, you can chain as many stages as you want to compose the full application even with several models. To provide the logic of each stage, you simply add a code function that is very specific to your application, and Pipeless takes care of calling it when required. This is why you can think about Pipeless as a framework that provides a serverless-like development experience for embedded computer vision. You provide a few functions and you don't have to worry about all the surrounding systems that are required. Another great feature of Pipeless is that you can add, remove, and update streams dynamically via a CLI or a REST API to fully automate your workflows. You can even specify restart policies that indicate when the processing of a stream should be restarted, whether it should be restarted after an error, etc. Finally, to deploy Pipeless you just need to install it and run it along with your code functions on any device, whether it is in a cloud VM or containerized mode, or directly within an edge device like a Nvidia Jetson, a Raspberry, or any others. Creating an Object Detection Application Let's deep dive into how to create a simple application for object detection using Pipeless. The first thing we have to do is to install it. Thanks to the installation script, it is very simple: curl https://raw.githubusercontent.com/pipeless-ai/pipeless/main/install.sh | bash Now, we have to create a project. A Pipeless project is a directory that contains stages. Every stage is under a sub-directory, and inside each sub-directory, we create the files containing hooks (our specific code functions). The name that we provide to each stage folder is the stage name that we have to indicate to Pipeless later when we want to run that stage for a stream. pipeless init my-project --template empty cd my-project Here, the empty template tells the CLI to just create the directory, if you do not provide any template, the CLI will prompt you several questions to create the stage interactively. As mentioned above, we now need to add a stage to our project. Let's download an example stage from GitHub with the following command: wget -O - https://github.com/pipeless-ai/pipeless/archive/main.tar.gz | tar -xz --strip=2 "pipeless-main/examples/onnx-yolo" That will create a stage directory, onnx-yolo, that contains our application functions. Let's check the content of each of the stage files; i.e., our application hooks. We have the pre-process.py file, which defines a function (hook) taking a frame and a context. The function makes some operations to prepare the input data from the received RGB frame in order to match the format that the model expects. That data is added to the frame_data['inference_input'] which is what Pipeless will pass to the model. def hook(frame_data, context): frame = frame_data["original"].view() yolo_input_shape = (640, 640, 3) # h,w,c frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) frame = resize_rgb_frame(frame, yolo_input_shape) frame = cv2.normalize(frame, None, 0.0, 1.0, cv2.NORM_MINMAX) frame = np.transpose(frame, axes=(2,0,1)) # Convert to c,h,w inference_inputs = frame.astype("float32") frame_data['inference_input'] = inference_inputs ... (some other auxiliar functions that we call from the hook function) We also have the process.json file, which indicates Pipeless the inference runtime to use (in this case, the ONNX Runtime), where to find the model that it should load, and some optional parameters for it, such as the execution_provider to use, i.e., CPU, CUDA, TensortRT, etc. { "runtime": "onnx", "model_uri": "https://pipeless-public.s3.eu-west-3.amazonaws.com/yolov8n.onnx", "inference_params": { "execution_provider": "tensorrt" } } Finally, the post-process.py file defines a function similar to the one at pre-process.py. This time, it takes the inference output that Pipeless stored at frame_data["inference_output"] and performs the operations to parse that output into bounding boxes. Later, it draws the bounding boxes over the frame, to finally assign the modified frame to frame_data['modified']. With that, Pipeless will forward the stream that we provide but with the modified frames including the bounding boxes. def hook(frame_data, _): frame = frame_data['original'] model_output = frame_data['inference_output'] yolo_input_shape = (640, 640, 3) # h,w,c boxes, scores, class_ids = parse_yolo_output(model_output, frame.shape, yolo_input_shape) class_labels = [yolo_classes[id] for id in class_ids] for i in range(len(boxes)): draw_bbox(frame, boxes[i], class_labels[i], scores[i]) frame_data['modified'] = frame ... (some other auxiliar functions that we call from the hook function) The final step is to start Pipeless and provide a stream. To start Pipeless, simply run the following command from the my-project directory: pipeless start --stages-dir . Once running, let's provide a stream from the webcam (v4l2) and show the output directly on the screen. Note we have to provide the list of stages that the stream should execute in order; in our case, it is just the onnx-yolo stage: pipeless add stream --input-uri "v4l2" --output-uri "screen" --frame-path "onnx-yolo" And that's all! Conclusion We have described how creating a computer vision application is a complex task due to many factors and the subsystems that we have to implement around it. With a framework like Pipeless, getting up and running takes just a few minutes and you can focus just on writing the code for your specific use case. Furthermore, Pipeless' stages are highly reusable and easy to maintain so the maintenance will be easy and you will be able to iterate very fast. If you want to get involved with Pipeless and contribute to its development, you can do so through its GitHub repository.
Optimizing I/O workloads in Python typically involves understanding where the bottlenecks are and then applying strategies to reduce or manage these bottlenecks. Profiling is a crucial step in this process as it helps identify the parts of the code that are most resource-intensive. Here's a step-by-step guide to optimizing I/O workloads by profiling in Python: Identify the I/O Workloads Comprehending the type of your I/O workloads is essential as a first step. Do they involve disk I/O, such as file read/write operations, network I/O, which includes data transmission over a network, or database I/O, comprising database interactions? Distinct optimization techniques apply to each category. I have taken up the I/O bottlenecks related to Network and file read/write operations for this article. Use Profiling Tools There are several tools available for profiling Python code: cProfile: cProfile is the most commonly used profiler in Python. cProfile is generally advised for most users due to being a C extension with manageable overhead, making it appropriate for profiling programs that run for extended periods. It is used widely for several reasons. Built-in and Standard: cProfile is a part of the Python Standard Library, which means it's readily available in any standard Python installation without additional packages. Low Overhead: As a C extension, cProfile introduces relatively low overhead compared to some pure Python profilers. This feature makes it suitable for profiling longer-running applications where the profiler's impact on performance is a concern. General-purpose Profiling: cProfile is suitable for most profiling needs, balancing detail and usability. It gives you a function-by-function breakdown of execution time, mainly needed to identify performance bottlenecks. Wide Acceptance and Community Support: Given its status as part of the standard library and ease of use, cProfile has a broad user base and community support. While cProfile is the most commonly used, it's important to note that the best profiler for a given task can depend on the project's specific needs. For instance, line_profiler is preferred for line-by-line analysis, and memory_profiler is used when memory usage is the primary concern. The choice of a profiler often depends on the specific aspect of the application you want to optimize. Line_profiler: Line_Profiler is a tool in Python that provides line-by-line profiling of your code, allowing you to see the performance of each line. This level of granularity can be beneficial when you're trying to optimize your code and need to understand where the bottlenecks are. Memory_profiler: This profiler is helpful if you suspect that memory usage is related to I/O inefficiency. 3. Analyze the Profile Data After running the profiler, analyze the data to find where most of the time is spent. Generally, the profiling output will indicate Long-running I/O operations, Repeated I/O operations that could be batched, and Unnecessary I/O operations that could be eliminated. 4. Apply Optimization Strategies Based on the findings, you can apply different strategies: Caching: Store data in memory to avoid repeated I/O operations. Batching: Combine multiple I/O operations into a single one to reduce overhead. Asynchronous I/O: Use asyncio or other asynchronous programming techniques to perform I/O operations without blocking the main thread. Buffering: Use buffers to reduce the number of I/O calls, especially for disk I/O. Data Compression: Reducing the size of the data being read or written can improve I/O performance, particularly for network and disk I/O. Parallelism: Use multi-threading or multi-processing to perform I/O operations in parallel, especially when dealing with network I/O. 5. Test and Iterate After applying optimizations, profile your code again to see the impact. Continue the below process iteratively: Optimize Profile Analyze Change 6. Other Considerations Ensure that your hardware is not a limiting factor. For database I/O, look into optimizing your database queries and indices. For file I/O, consider the file system and hardware it's running on. 7. Documentation and Community Resources Read the documentation of the profiling tools you use for more detailed guidance. Engage with Python communities or forums for specific advice and best practices. Remember, optimization is often about trade-offs, and focusing on the parts of your code that will yield the most significant improvements is essential. Weather Station Data Analysis and Profiling I have taken an example of analyzing weather station data. The weather station records the hourly temperature and has the columns below. Plain Text "STATION","DATE","SOURCE","LATITUDE","LONGITUDE","ELEVATION","NAME","REPORT_TYPE","CALL_SIGN","QUALITY_CONTROL","WND","CIG","VIS","TMP","DEW","SLP","AA1","AA2","AA3","AJ1","KA1","KA2","OC1","OD1","OD2","REM" Station and Tmp are needed for our analysis of all the columns. I am doing the below steps. Create a Python program that accepts the parameters (station list (separated by comma), years (start and end year separated by hyphen)) Download the weather station data as a CSV Parse the CSV and get all the temperatures for the station list and the year range provided in the parameters. Find the maximum, minimum, and average temperatures for the stations for the year range. Profile the code Analyze the I/O bottleneck. Implement local caching Analyze the output and the runtime. Code Without Local Cache This program downloads the weather data for the specified stations and calculates low and high weather for the given year: Python import csv import sys import requests import collections from statistics import mean # This function downloads the weather data for station/year and write the output as a csv file def download_weather_station_data(station, year): my_url = generic_url.format(station=station, year=year) req = requests.get(my_url) if req.status_code != 200: return with open(generic_file.format(station=station, year=year), 'w') as sf: sf.write(req.text) # This parent function downloads the weather data for the given station list and year range def download_all_weather_station_data(stations_list, start_year, end_year): for station in stations_list: for year in range(start_year, end_year + 1): download_weather_station_data(station, year) # This function gets the temperature details from the file def get_file_temperature(file_name): with open(file_name, 'r') as tf: reader = csv.reader(tf) header = next(reader) for row in reader: station = row[header.index("STATION")] temp = row[header.index("TMP")] temperature, status = temp.split(",") if int(status) != 1: continue temperature = int(temperature) / 10 yield temperature # This parent function gets all the temperatures for the given station and year def get_temperatures_all(stations_list, start_year, end_year): temperatures = collections.defaultdict(list) for station in stations_list: for year in range(start_year, end_year + 1): for temperature in get_file_temperature(generic_file.format(station=station, year=year)): temperatures[station].append(temperature) return temperatures # This function gets the maximum/minimum/average temperature for the station over the given years def get_temperatures(lst_temperatures, calc_mode): result = {} for mode in calc_mode: if mode == 'max': result[mode] = {station: max(temperatures) for station, temperatures in lst_temperatures.items()} elif mode == 'min': result[mode] = {station: min(temperatures) for station, temperatures in lst_temperatures.items()} else: result[mode] = {station: mean(temperatures) for station, temperatures in lst_temperatures.items()} return result # Main Function if __name__ := "__main__": stations = sys.argv[1].split(",") years = [int(year) for year in sys.argv[2].split("-")] first_year = years[0] last_year = years[1] generic_url = "https://www.ncei.noaa.gov/data/global-hourly/access/{year}/{station}.csv" generic_file = "Weather_station_{station}_{year}.csv" download_all_weather_station_data(stations, first_year, last_year) temperatures_all = get_temperatures_all(stations, first_year, last_year) temperatures_values = get_temperatures(temperatures_all, ['max', 'min', 'avg']) print(f"The temperatures are {temperatures_values}") Executed the code and got the desired output. python load_weather_data.py "01480099999,02110099999,02243099999" 2018-2023 Output: Plain Text The temperatures are {'max': {'01480099999': 33.5, '02110099999': 29.6, '02243099999': 32.0}, 'min': {'01480099999': -20.4, '02110099999': -39.5, '02243099999': -32.1}, 'avg': {'01480099999': 7.145012712693135, '02110099999': 0.23863829994401306, '02243099999': 3.383049058515579} Analyze the Code With CProfile: python -m cProfile -s cumulative load_weather_data.py "01480099999,02110099999,02243099999" 2018-2023 > load_weather_data_profile.txt Shell The temperatures are {'max': {'01480099999': 33.5, '02110099999': 29.6, '02243099999': 32.0}, 'min': {'01480099999': -20.4, '02110099999': -39.5, '02243099999': -32.1}, 'avg': {'01480099999': 7.1538004828081165, '02110099999': 0.23863829994401306, '02243099999': 3.383049058515579} 1422783 function calls (1416758 primitive calls) in 17.250 seconds Ordered by: cumulative time ncalls tottime percall cumtime percall filename:lineno(function) 181/1 0.002 0.000 17.250 17.250 {built-in method builtins.exec} 1 0.000 0.000 17.250 17.250 load_weather_data.py:1(<module>) 1 0.003 0.003 16.241 16.241 load_weather_data.py:23(download_all_weather_station_data) 18 0.003 0.000 16.221 0.901 load_weather_data.py:12(download_weather_station_data) The function call download_all_weather_station_data has taken the most run time, leaving the scope to optimize I/O. Since the data is static, there is no need to generate the CSV files again once generated. The program below is optimized not to generate the files again if they have already been generated. Python """This program downloads the weather data for the specified stations and calculates low and high weather for the given year""" import os import csv import sys import fnmatch import requests import collections from statistics import mean # This function downloads the weather data for station/year and write the output as a csv file def download_weather_station_data(station, year): my_url = generic_url.format(station=station, year=year) req = requests.get(my_url) if req.status_code != 200: return with open(generic_file.format(station=station, year=year), 'w') as sf: sf.write(req.text) # This parent function downloads the weather data for the given station list and year range def download_all_weather_station_data(stations_list, start_year, end_year): for station in stations_list: for year in range(start_year, end_year + 1): if not os.path.exists(generic_file.format(station=station, year=year)): download_weather_station_data(station, year) # This function gets the temperature details from the file def get_file_temperature(file_name): with open(file_name, 'r') as tf: reader = csv.reader(tf) header = next(reader) for row in reader: station = row[header.index("STATION")] temp = row[header.index("TMP")] temperature, status = temp.split(",") if int(status) != 1: continue temperature = int(temperature) / 10 yield temperature # This parent function gets all the temperatures for the given station and year def get_temperatures_all(stations_list, start_year, end_year): temperatures = collections.defaultdict(list) for station in stations_list: for year in range(start_year, end_year + 1): if os.path.exists(generic_file.format(station=station, year=year)): for temperature in get_file_temperature(generic_file.format(station=station, year=year)): temperatures[station].append(temperature) return temperatures # This function gets the maximum/minimum/average temperature for the station over the given years def get_temperatures(lst_temperatures, calc_mode): result = {} for mode in calc_mode: if mode == 'max': result[mode] = {station: max(temperatures) for station, temperatures in lst_temperatures.items()} elif mode == 'min': result[mode] = {station: min(temperatures) for station, temperatures in lst_temperatures.items()} else: result[mode] = {station: mean(temperatures) for station, temperatures in lst_temperatures.items()} return result # Main Function if __name__ := "__main__": stations = sys.argv[1].split(",") years = [int(year) for year in sys.argv[2].split("-")] first_year = years[0] last_year = years[1] generic_url = "https://www.ncei.noaa.gov/data/global-hourly/access/{year}/{station}.csv" generic_file = "Weather_station_{station}_{year}.csv" current_directory = os.getcwd() download_all_weather_station_data(stations, first_year, last_year) count = len(fnmatch.filter(os.listdir(current_directory), '*.csv')) if count > 0: temperatures_all = get_temperatures_all(stations, first_year, last_year) temperatures_values = get_temperatures(temperatures_all, ['max', 'min', 'avg']) print(f"The temperatures are {temperatures_values}") else: print(f"There are no file(s) available for the given stations {sys.argv[1]} and years {sys.argv[2]}") Executed the code and got the desired output. python load_weather_data_cache.py "01480099999,02110099999,02243099999" 2018-2023 Result: Shell The temperatures are {'max': {'01480099999': 33.5, '02110099999': 29.6, '02243099999': 32.0}, 'min': {'01480099999': -20.4, '02110099999': -39.5, '02243099999': -32.1}, 'avg': {'01480099999': 7.145012712693135, '02110099999': 0.2386 3829994401306, '02243099999': 3.383049058515579} Analyze the Code With CProfile: python -m cProfile -s cumulative load_weather_data_cache.py "01480099999,02110099999,02243099999" 2018-2023 > load_weather_data_cache_profile.txt Shell The temperatures are {'max': {'01480099999': 33.5, '02110099999': 29.6, '02243099999': 32.0}, 'min': {'01480099999': -20.4, '02110099999': -39.5, '02243099999': -32.1}, 'avg': {'01480099999': 7.1538004828081165, '02110099999': 0.23863829994401306, '02243099999': 3.383049058515579} 1142084 function calls (1136170 primitive calls) in 1.005 seconds Ordered by: cumulative time ncalls tottime percall cumtime percall filename:lineno(function) 179/1 0.002 0.000 1.005 1.005 {built-in method builtins.exec} 1 0.000 0.000 1.005 1.005 load_weather_data_cache.py:1(<module>) 1 0.040 0.040 0.670 0.670 load_weather_data_cache.py:50(get_temperatures_all) 119125 0.516 0.000 0.619 0.000 load_weather_data_cache.py:33(get_file_temperature) 17 0.000 0.000 0.367 0.022 __init__.py:1(<module>) The function call download_all_weather_station_data is not appearing anymore as the most run time. The overall runtime decreased by approximately 16 times. Conclusion Caches, as demonstrated in this example, have the potential to accelerate code by several orders of magnitude. However, managing caches can be challenging and often leads to bugs. In the given example, the files remain static over time, but it's worth noting that there are numerous scenarios in which the cached data could change. In such situations, the code responsible for cache management must be capable of identifying and addressing these changes.
Reza Rahman
Principal Program Manager, Java on Azure,
Microsoft
Kai Wähner
Technology Evangelist,
Confluent
Alvin Lee
Founder,
Out of the Box Development, LLC