Home > Articles

This chapter is from the book

Streaming

The OCI Streaming service provides a scalable messaging system with durable storage. It is a fully managed service that can be used for ingesting continuous streams of data. Streaming service is suited for building web-scale applications and microservices that use a message-driven architecture. These applications are typically designed around data that is produced and processed continually and sequentially in a Pub/Sub messaging model. The OCI Streaming service is also suited for applications that ingest logs, metrics, and operational telemetry, as well as other fast data streams, such as website clickstreams. As a fully managed service, OCI Streaming manages all infrastructure needed to operate and scale the service, from provisioning, deployment, maintenance, and replication to configuration of the hardware and software that enables you to stream data. As a user of the service, you create a stream and configure the partitions. Streams and partitions are resources provided by the service; they are discussed in the next section. You can securely put and get your data from Streaming through SSL endpoints using the HTTPS protocol. The service ensures that user data is encrypted both at rest and in transit, and you can bring your own encryption keys that you manage in the OCI Vault service. Streams also support private endpoints, which limits the visibility of your streaming endpoint so that it is restricted within your virtual cloud network (VCN), preventing access through the Internet.

Understanding the Streaming Service

The OCI Streaming service is a fully managed service. As such, it exposes a resource called a stream that encapsulates the infrastructure required to operate a messaging system and manage its lifecycle. Developers first create a stream in the streaming service using the console, CLI, Terraform, or the APIs. A stream is the primary resource you interact with, and it can be thought of as an append-only log. Streams are organized into stream pools that provide a way to manage the settings for all the streams in a pool. If you do not explicitly associate a stream with a stream pool, the stream is created in the default stream pool.

After a stream has been created, applications can publish messages to it. In most cases, applications use the OCI SDK or the APIs directly to publish messages. You can also use the OCI console and the CLI to send messages to your stream for testing. Another popular way for applications to interact with the streaming service is to use the Kafka APIs, which the streaming service supports. Every message consists of a key and a value, both of which can be set by the developer. Listing 3-3 shows a snippet of code using the Python SDK to publish a message. Multiple publisher applications can publish messages to the stream at the same time.

Subscribers or subscriber applications can consume messages from the stream either individually or as part of a consumer group. The streaming APIs and SDK offer many options for consumers to control how messages are delivered to them.

As applications publish messages to a stream, these messages are distributed to partitions that are managed by the streaming service. Each partition stores a subset of the messages that were published. Having multiple partitions allows message consumers to consume messages from multiple partitions at the same time. Because publishers and subscribers can use partitions in parallel, the number of partitions has an impact on the message throughput of the stream. There are limits to this as well. Each partition is limited to 1MBps of data write and 5 get requests per second from each consumer group. When a new stream is created, the number of partitions it should use needs to be specified. Once created, the number of partitions in the stream cannot be changed. Messages that are published onto a stream by producers are routed and stored on one of the partitions in the stream. Figure 3-12 shows an overview of how applications can publish and receive messages using the streaming service, as well as the various components of the streaming service itself.

FIGURE 3.12

FIGURE 3-12 An OCI Stream Showing Various Partitions and How Publishers and Subscribers Can Communicate Using Messages

Working with the OCI Streaming Service

A producer publishes a message onto the stream. The various SDKs for languages such as Java, Python, Go, JavaScript, and TypeScript provide wrapper methods to access the streaming APIs. A single call to publish messages can include multiple messages, but the total size of payload must be 1 mebibyte (MiB) or less. Each message that is published to the stream should contain a key and a value. If there is more than one partition, the steaming service determines the partition where the message is published using the message key. Based on the key, two messages with different keys could potentially be published on the same partition; however, messages with the same key always go to the same partition. If you do not specify a key, the service considers the message to have a null key and generates a random key for the message. Messages with null keys trigger the generation of random keys, so these messages do not pile up within the same partition. This avoids accidental hot spots, with messages with null keys all ending up on the same partition and impacting the throughput of the system. Listing 3-3 shows a snippet of Python code that connects to a stream on the OCI Streaming service and publishes two messages with the single call. It shows the Python SDK reading the config to connect and authenticate, with OCI being loaded from a file and a streaming client being created.

Listing 3-3 Example Code to Publish a Message Using the Python SDK

config = oci.config.from_file()
streaming_client = oci.streaming.StreamClient(
    config, "https://service_endpoint.url")

streaming_client.put_messages(
    stream_id="<stream_OCID>",
    put_messages_details=oci.streaming.models.PutMessagesDetails(
        messages=[
            oci.streaming.models.PutMessagesDetailsEntry(
                value="FirstMessage",
                key="key_one"),
            oci.streaming.models.PutMessagesDetailsEntry(
                value="SecondMessage",
                key="key_two")])

The StreamClient provides the function put_messages, which wraps the streaming service’s API for publishing messages. It requires the stream ID, which is the OCID for the stream, as well as a list of messages to publish. As mentioned previously, there is no limit to the number of messages that can be included in this function call, as long as the total size of the payload is 1MB or less. The message keys can be up to 256 bytes. The SDKS for other languages provide similar constructs.

Consumer applications consume messages from a stream using the API or the SDKs in a manner similar to how a producer publishes messages onto the stream. A consumer needs to start consuming messages from some point in the stream. Consumers use a cursor, which is a pointer to a specific location within a stream, to do this. Messages then are consumed starting with the one that the cursor points to. The streaming service guarantees that the messages from a partition are always delivered in the same order they were produced. After a cursor has been created, the consumer uses the GetMessages API to fetch messages. Similar to publishing messages, a single call to the GetMessages API returns multiple messages. By default, the number of messages that are batched inside a single response is based on the average message size, so as to not exceed the stream’s throughput. You can also specify the number of messages to be returned, as long as you do not exceed the throughput of the stream. As the number of messages returned from a call to the GetMessages API can vary based on the message size, the call also returns a cursor for use with the next GetMessages call. The cursor is returned as a response header value in the custom header opc-next-cursor. The next call to GetMessages can use the value returned in the header as the cursor parameter, to get the next batch of messages.

Individual consumers can start consuming messages from different relative points in the stream using different types of cursors. The types of cursors include ones that point to the following:

  • A specific time (cursor type AT_TIME)

  • The earliest message available on the stream (cursor type TRIM_HORIZON)

  • A relative position within the messages on the partition, called an offset (cursor type AT_OFFSET or AFTER_OFFSET)

  • Only messages published after the cursor has been created (cursor type LATEST)

This enables consumers to keep track of the various partitions, the position of the last message the consuming application has consumed from the partition, and from what position in the partition the consuming application needs to start in case it is interrupted or terminated and needs to restart consuming from where it left off.

Consumers can also be grouped into ConsumerGroups that coordinate the consumption of messages from a stream. In streams that have numerous partitions, keeping track of offsets and partitions while dynamically scaling the number of consumers can be cumbersome. ConsumerGroups can push to the streaming service most of the heavy lifting required to manage offsets and partitions when consumers are scaled up or down. This helps developers focus on what to do with messages instead of having to orchestrate message consumption. ConsumerGroups consist of multiple consumers, called instances. The ConsumerGroups automatically manage offset tracking, assign the various instances in the group to specific partitions, and balance the group as instances are created and removed in the ConsumerGroups. ConsumerGroups are more efficient and practical for most purposes than individual consumers simply because of the benefits they provide at no extra cost. ConsumerGroups use a cursor called a GroupCursor, which creates a group name and instance name association, in addition to performing the duties of a normal cursor. The first time a GroupCursor is created with a new group name, the ConsumerGroup by that name is created. When a group cursor is created with an existing group name and a new instance name, the consumer that requested the group cursor is added to the group as a new instance in the group. Each instance in a group is assigned a partition, and an instance may be assigned more than one partition. However, two instances will never be assigned to a single partition; if a ConsumerGroup has more instances than partitions, the extra instances remain idle. ConsumerGroups automatically remove instances that have not consumed messages for more than 30 seconds. In these cases, the idle instances in the ConsumerGroup are assigned to a partition whose assigned instance has been removed.

The 30-second window to request additional messages essentially means that consumers should ideally limit the number of messages requested to something that it can process within 30 seconds. If it takes longer than 30 seconds to process the message and call getMessages again, the service assumes that the consumer went offline and allocates the partition to an idle consumer. Data is not lost in these scenarios, though, because the default behavior of the GroupCursor is to commit messages on the next call to getMessages. So in a scenario in which a consumer has been terminated, fails, or cannot process all messages within 30 seconds, the messages are not considered committed (or processed). The partition is allocated to another consumer (when one comes online in the group, if there are no idle consumers), and these messages are delivered to the consumer for processing again. Some of these messages might have been processed by the failed consumer before it failed, so these messages appear as redundant to the second consumer. This also illustrates the “at least once” delivery model of the streaming service. How the consumer applications handle redundant messages is up to the consuming application, and they should be designed to account for multiple message deliveries in situations like the aforementioned one.

Listing 3-4 shows a typical ConsumerGroup using a group cursor to consume messages.

Listing 3-4 Consumer Group Using a Group Cursor

config = oci.config.from_file()
streaming_client = oci.streaming.StreamClient(
    config, "https://service_endpoint.url")

cursor_details = oci.streaming.models
                                        .CreateGroupCursorDetails(group_
  name="group01", instance_name="instance01",

  type=oci.streaming.models.CreateGroupCursorDetails.TYPE_TRIM_HORIZON,

  commit_on_get=True)
response = sc.create_group_cursor(sid, cursor_details)
cursor = response.data.value
while True:
        get_response = client.get_messages(
                                                stream_id="ocid1.test.oc1..xxxxx.
  streamId-Value",
                                                cursor,
                                                limit=10)


        if not get_response.data:
                return

        # Process the messages
        print(" Read {} messages".format(len(get_response.data)))
        for message in get_response.data:
                print("{}: {}".format(b64decode(message.key.encode()).decode(),
                                                        b64decode(message.value.
  encode()).decode()))
        time.sleep(1)
        # use the next-cursor for iteration
       cursor = get_response.headers["opc-next-cursor"]

Listing 3-4 shows a stream client being created. A group cursor is also created that creates a ConsumerGroup called group01. This consumer within the group (instance) is named instance01. The initial group cursor is used to call the get_messages API with a message limit set to 10. This is done to illustrate the fact that all instances in a ConsumerGroup should try to limit messages to what they can process within 30 seconds; a gap of more than 30 seconds between calls to the get_messages API causes the service to consider the instance as offline, as previously discussed. After the messages are processed, the opc-next-cursor response header is extracted to get the cursor for the next call to get_messages. Note that, in this example, with commit_on_get set to True when creating the GroupCursor, the first 10 messages that were returned are committed when the instance calls the get_messages the second time. If this instance takes too long to process the first 10 messages or it went offline unexpectedly, then these messages are not committed and they are delivered to another instance if and when one becomes available.

Service Connector Hub Integration

The Streaming service is integrated with the OCI Service Connector Hub. The OCI Service Connector Hub is a messaging bus that enables you to orchestrate data movement between services in OCI. Using the Service Connector Hub, you can define the source for the data, a set of tasks that you can optionally apply to the data to process it (such as transforming the data), and a target service to deliver the processed data. Using the service bus connector, you can enable use cases in which you can use a stream as a data source, use Serverless Functions to transform the stream’s messages, and deliver the transformed messages to a target while maintaining Streaming’s order guarantees.

Kafka Compatibility

Streaming is compatible with most Kafka6 APIs, enabling you to use applications written for Kafka to send messages to and receive messages from the Streaming service without having to rewrite your code. Streaming makes it possible to offload the setup, maintenance, and management of the infrastructure that hosting your own Apache Kafka cluster requires. Streaming also takes advantage of the Kafka Connect ecosystem to interface directly with first-party and third-party products by using out-of-the-box Kafka source and sink connectors. At the time of writing, the service offers compatibility with the Kafka APIs outlined in Table 3-2.

Table 3-2 OCI Streaming Compatibility with Various Kafka APIs

Compatible

Incompatible

Producer

Compaction

Consumer

Transactions

Kafka Connect

Dynamic Partition Addition

Group Management

Idempotent Production

Admin

Kafka Streams

If you use Kafka APIs to publish messages to Streaming, you can choose to do custom partitioning and explicitly map messages to partitions. Although this gives you more control and predictability over what messages are sent to which partitions, the Streaming service avoids this, to keep from accumulating too many messages in the same partitions and creating “hotspots.” When developers take control over partitions with custom partitioning, they also take on the responsibility to avoid hotspots from having too many messages within the same partitions.

The Kafka Connect support in OCI Streaming allows developers to leverage the Kafka Connect ecosystem of connectors to move data between systems. Several connectors make it easy to create integrations with Oracle platforms:

  • Kafka Connect JDBC, for working with the Oracle database

  • Oracle Integration Cloud

  • Oracle Golden Gate

  • Kafka Connect Amazon S3 connectors, which can use the Oracle Object Storage S3-compatible APIs

When using Kafka Connect, you need to create Kafka Connect Configurations called harnesses on the OCI Streaming service. A single harness can be used to configure multiple connectors and the harness needs to be created within the same compartment as the stream. Kafka Connect uses internal topics to track and manage connector and task configurations, offsets, and status. These internal topics are automatically created by the Streaming service and follow the convention <stream ocid>-{config|offset|status}. These topics can be configured in the distributed worker configuration file of the connector, typically connect-distributed.properties.

InformIT Promotional Mailings & Special Offers

I would like to receive exclusive offers and hear about products from InformIT and its family of brands. I can unsubscribe at any time.

Overview


Pearson Education, Inc., 221 River Street, Hoboken, New Jersey 07030, (Pearson) presents this site to provide information about products and services that can be purchased through this site.

This privacy notice provides an overview of our commitment to privacy and describes how we collect, protect, use and share personal information collected through this site. Please note that other Pearson websites and online products and services have their own separate privacy policies.

Collection and Use of Information


To conduct business and deliver products and services, Pearson collects and uses personal information in several ways in connection with this site, including:

Questions and Inquiries

For inquiries and questions, we collect the inquiry or question, together with name, contact details (email address, phone number and mailing address) and any other additional information voluntarily submitted to us through a Contact Us form or an email. We use this information to address the inquiry and respond to the question.

Online Store

For orders and purchases placed through our online store on this site, we collect order details, name, institution name and address (if applicable), email address, phone number, shipping and billing addresses, credit/debit card information, shipping options and any instructions. We use this information to complete transactions, fulfill orders, communicate with individuals placing orders or visiting the online store, and for related purposes.

Surveys

Pearson may offer opportunities to provide feedback or participate in surveys, including surveys evaluating Pearson products, services or sites. Participation is voluntary. Pearson collects information requested in the survey questions and uses the information to evaluate, support, maintain and improve products, services or sites, develop new products and services, conduct educational research and for other purposes specified in the survey.

Contests and Drawings

Occasionally, we may sponsor a contest or drawing. Participation is optional. Pearson collects name, contact information and other information specified on the entry form for the contest or drawing to conduct the contest or drawing. Pearson may collect additional personal information from the winners of a contest or drawing in order to award the prize and for tax reporting purposes, as required by law.

Newsletters

If you have elected to receive email newsletters or promotional mailings and special offers but want to unsubscribe, simply email information@informit.com.

Service Announcements

On rare occasions it is necessary to send out a strictly service related announcement. For instance, if our service is temporarily suspended for maintenance we might send users an email. Generally, users may not opt-out of these communications, though they can deactivate their account information. However, these communications are not promotional in nature.

Customer Service

We communicate with users on a regular basis to provide requested services and in regard to issues relating to their account we reply via email or phone in accordance with the users' wishes when a user submits their information through our Contact Us form.

Other Collection and Use of Information


Application and System Logs

Pearson automatically collects log data to help ensure the delivery, availability and security of this site. Log data may include technical information about how a user or visitor connected to this site, such as browser type, type of computer/device, operating system, internet service provider and IP address. We use this information for support purposes and to monitor the health of the site, identify problems, improve service, detect unauthorized access and fraudulent activity, prevent and respond to security incidents and appropriately scale computing resources.

Web Analytics

Pearson may use third party web trend analytical services, including Google Analytics, to collect visitor information, such as IP addresses, browser types, referring pages, pages visited and time spent on a particular site. While these analytical services collect and report information on an anonymous basis, they may use cookies to gather web trend information. The information gathered may enable Pearson (but not the third party web trend services) to link information with application and system log data. Pearson uses this information for system administration and to identify problems, improve service, detect unauthorized access and fraudulent activity, prevent and respond to security incidents, appropriately scale computing resources and otherwise support and deliver this site and its services.

Cookies and Related Technologies

This site uses cookies and similar technologies to personalize content, measure traffic patterns, control security, track use and access of information on this site, and provide interest-based messages and advertising. Users can manage and block the use of cookies through their browser. Disabling or blocking certain cookies may limit the functionality of this site.

Do Not Track

This site currently does not respond to Do Not Track signals.

Security


Pearson uses appropriate physical, administrative and technical security measures to protect personal information from unauthorized access, use and disclosure.

Children


This site is not directed to children under the age of 13.

Marketing


Pearson may send or direct marketing communications to users, provided that

  • Pearson will not use personal information collected or processed as a K-12 school service provider for the purpose of directed or targeted advertising.
  • Such marketing is consistent with applicable law and Pearson's legal obligations.
  • Pearson will not knowingly direct or send marketing communications to an individual who has expressed a preference not to receive marketing.
  • Where required by applicable law, express or implied consent to marketing exists and has not been withdrawn.

Pearson may provide personal information to a third party service provider on a restricted basis to provide marketing solely on behalf of Pearson or an affiliate or customer for whom Pearson is a service provider. Marketing preferences may be changed at any time.

Correcting/Updating Personal Information


If a user's personally identifiable information changes (such as your postal address or email address), we provide a way to correct or update that user's personal data provided to us. This can be done on the Account page. If a user no longer desires our service and desires to delete his or her account, please contact us at customer-service@informit.com and we will process the deletion of a user's account.

Choice/Opt-out


Users can always make an informed choice as to whether they should proceed with certain services offered by InformIT. If you choose to remove yourself from our mailing list(s) simply visit the following page and uncheck any communication you no longer want to receive: www.informit.com/u.aspx.

Sale of Personal Information


Pearson does not rent or sell personal information in exchange for any payment of money.

While Pearson does not sell personal information, as defined in Nevada law, Nevada residents may email a request for no sale of their personal information to NevadaDesignatedRequest@pearson.com.

Supplemental Privacy Statement for California Residents


California residents should read our Supplemental privacy statement for California residents in conjunction with this Privacy Notice. The Supplemental privacy statement for California residents explains Pearson's commitment to comply with California law and applies to personal information of California residents collected in connection with this site and the Services.

Sharing and Disclosure


Pearson may disclose personal information, as follows:

  • As required by law.
  • With the consent of the individual (or their parent, if the individual is a minor)
  • In response to a subpoena, court order or legal process, to the extent permitted or required by law
  • To protect the security and safety of individuals, data, assets and systems, consistent with applicable law
  • In connection the sale, joint venture or other transfer of some or all of its company or assets, subject to the provisions of this Privacy Notice
  • To investigate or address actual or suspected fraud or other illegal activities
  • To exercise its legal rights, including enforcement of the Terms of Use for this site or another contract
  • To affiliated Pearson companies and other companies and organizations who perform work for Pearson and are obligated to protect the privacy of personal information consistent with this Privacy Notice
  • To a school, organization, company or government agency, where Pearson collects or processes the personal information in a school setting or on behalf of such organization, company or government agency.

Links


This web site contains links to other sites. Please be aware that we are not responsible for the privacy practices of such other sites. We encourage our users to be aware when they leave our site and to read the privacy statements of each and every web site that collects Personal Information. This privacy statement applies solely to information collected by this web site.

Requests and Contact


Please contact us about this Privacy Notice or if you have any requests or questions relating to the privacy of your personal information.

Changes to this Privacy Notice


We may revise this Privacy Notice through an updated posting. We will identify the effective date of the revision in the posting. Often, updates are made to provide greater clarity or to comply with changes in regulatory requirements. If the updates involve material changes to the collection, protection, use or disclosure of Personal Information, Pearson will provide notice of the change through a conspicuous notice on this site or other appropriate way. Continued use of the site after the effective date of a posted revision evidences acceptance. Please contact us if you have questions or concerns about the Privacy Notice or any objection to any revisions.

Last Update: November 17, 2020