Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
INF1802
kafka-producer-demo
Commits
e7e03e1b
Commit
e7e03e1b
authored
May 23, 2019
by
Maria Julia Dias de Lima
Browse files
Initial commit
parents
Pipeline
#7283
failed with stages
Changes
8
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
.gitignore
0 → 100644
View file @
e7e03e1b
# Eclipse
.classpath
.project
.settings/
# Intellij
.idea/
*.iml
*.iws
# Mac
.DS_Store
# Maven
log/
target/
pom.xml
0 → 100644
View file @
e7e03e1b
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>
4.0.0
</modelVersion>
<groupId>
tecgraf.inf1802.exercises
</groupId>
<artifactId>
kakfa-producer-demo
</artifactId>
<version>
1.0-SNAPSHOT
</version>
<build>
<plugins>
<plugin>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-compiler-plugin
</artifactId>
<configuration>
<source>
1.8
</source>
<target>
1.8
</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>
org.apache.kafka
</groupId>
<artifactId>
kafka-clients
</artifactId>
<version>
2.2.0
</version>
</dependency>
<dependency>
<groupId>
org.slf4j
</groupId>
<artifactId>
slf4j-simple
</artifactId>
<version>
1.7.26
</version>
</dependency>
<dependency>
<groupId>
com.fasterxml.jackson.core
</groupId>
<artifactId>
jackson-databind
</artifactId>
<version>
2.9.9
</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
src/main/java/ProducerDemo.java
0 → 100644
View file @
e7e03e1b
import
org.apache.kafka.clients.producer.KafkaProducer
;
import
org.apache.kafka.clients.producer.ProducerConfig
;
import
org.apache.kafka.clients.producer.ProducerRecord
;
import
org.apache.kafka.common.serialization.StringSerializer
;
import
java.util.Properties
;
public
class
ProducerDemo
{
public
static
void
main
(
String
[]
args
){
// Criar as propriedades do produtor
Properties
properties
=
new
Properties
();
properties
.
setProperty
(
ProducerConfig
.
BOOTSTRAP_SERVERS_CONFIG
,
"localhost:9092"
);
properties
.
setProperty
(
ProducerConfig
.
KEY_SERIALIZER_CLASS_CONFIG
,
StringSerializer
.
class
.
getName
());
properties
.
setProperty
(
ProducerConfig
.
VALUE_SERIALIZER_CLASS_CONFIG
,
StringSerializer
.
class
.
getName
());
// Criar o produtor
KafkaProducer
<
String
,
String
>
producer
=
new
KafkaProducer
<
String
,
String
>(
properties
);
// Enviar as mensagens
ProducerRecord
<
String
,
String
>
record
=
new
ProducerRecord
<
String
,
String
>(
"meu_topico"
,
"bom dia!"
);
producer
.
send
(
record
);
// Envio assíncrono
producer
.
close
();
}
}
src/main/java/ProducerDemoKeys.java
0 → 100644
View file @
e7e03e1b
import
org.apache.kafka.clients.producer.*
;
import
org.apache.kafka.common.protocol.types.Field
;
import
org.apache.kafka.common.serialization.StringSerializer
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.util.Properties
;
import
java.util.concurrent.ExecutionException
;
public
class
ProducerDemoKeys
{
private
static
Logger
logger
=
LoggerFactory
.
getLogger
(
ProducerDemoKeys
.
class
.
getName
());
public
static
void
main
(
String
[]
args
)
{
// Criar as propriedades do produtor
Properties
properties
=
new
Properties
();
properties
.
setProperty
(
ProducerConfig
.
BOOTSTRAP_SERVERS_CONFIG
,
"localhost:9092"
);
properties
.
setProperty
(
ProducerConfig
.
KEY_SERIALIZER_CLASS_CONFIG
,
StringSerializer
.
class
.
getName
());
properties
.
setProperty
(
ProducerConfig
.
VALUE_SERIALIZER_CLASS_CONFIG
,
StringSerializer
.
class
.
getName
());
// Criar o produtor
KafkaProducer
<
String
,
String
>
producer
=
new
KafkaProducer
<
String
,
String
>(
properties
);
// Enviar as mensagens
for
(
int
i
=
0
;
i
<
10
;
i
++)
{
String
topic
=
"meu_topico"
;
String
value
=
"bom dia "
+
i
+
"!"
;
final
String
key
=
"id_"
+
i
;
ProducerRecord
<
String
,
String
>
record
=
new
ProducerRecord
<
String
,
String
>(
topic
,
key
,
value
);
producer
.
send
(
record
,
new
Callback
()
{
public
void
onCompletion
(
RecordMetadata
recordMetadata
,
Exception
e
)
{
if
(
e
==
null
)
{
logger
.
info
(
"Key :"
+
key
+
" -- Partição: "
+
recordMetadata
.
partition
());
}
else
{
logger
.
error
(
"Erro no envio da mensagem"
,
e
);
}
}
});
// Envio assíncrono
}
// Fecha o produtor
producer
.
close
();
}
}
src/main/java/ProducerDemoWithCallback.java
0 → 100644
View file @
e7e03e1b
import
org.apache.kafka.clients.producer.*
;
import
org.apache.kafka.common.serialization.StringSerializer
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.util.Properties
;
public
class
ProducerDemoWithCallback
{
private
static
Logger
logger
=
LoggerFactory
.
getLogger
(
ProducerDemoWithCallback
.
class
.
getName
());
public
static
void
main
(
String
[]
args
){
// Criar as propriedades do produtor
Properties
properties
=
new
Properties
();
properties
.
setProperty
(
ProducerConfig
.
BOOTSTRAP_SERVERS_CONFIG
,
"localhost:9092"
);
properties
.
setProperty
(
ProducerConfig
.
KEY_SERIALIZER_CLASS_CONFIG
,
StringSerializer
.
class
.
getName
());
properties
.
setProperty
(
ProducerConfig
.
VALUE_SERIALIZER_CLASS_CONFIG
,
StringSerializer
.
class
.
getName
());
// Criar o produtor
KafkaProducer
<
String
,
String
>
producer
=
new
KafkaProducer
<
String
,
String
>(
properties
);
// Enviar as mensagens
for
(
int
i
=
0
;
i
<
10
;
i
++)
{
ProducerRecord
<
String
,
String
>
record
=
new
ProducerRecord
<
String
,
String
>(
"meu_topico"
,
"bom dia "
+
i
+
"!"
);
producer
.
send
(
record
,
new
Callback
()
{
public
void
onCompletion
(
RecordMetadata
recordMetadata
,
Exception
e
)
{
if
(
e
==
null
)
{
logger
.
info
(
"Exibindo os meta-dados sobre o envio da mensagem. \n"
+
"Topico: "
+
recordMetadata
.
topic
()
+
"\n"
+
"Partição: "
+
recordMetadata
.
partition
()
+
"\n"
+
"Offset"
+
recordMetadata
.
offset
());
}
else
{
logger
.
error
(
"Erro no envio da mensagem"
,
e
);
}
}
});
// Envio assíncrono
}
// Fecha o produtor
producer
.
close
();
}
}
src/main/java/ProducerDemoWithSerializer.java
0 → 100644
View file @
e7e03e1b
import
com.fasterxml.jackson.core.JsonProcessingException
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
org.apache.kafka.clients.producer.*
;
import
org.apache.kafka.common.serialization.Serializer
;
import
org.apache.kafka.common.serialization.StringSerializer
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.time.LocalDate
;
import
java.util.Date
;
import
java.util.Map
;
import
java.util.Properties
;
public
class
ProducerDemoWithSerializer
{
private
static
Logger
logger
=
LoggerFactory
.
getLogger
(
ProducerDemoWithSerializer
.
class
.
getName
());
public
static
void
main
(
String
[]
args
){
// Criar as propriedades do produtor
Properties
properties
=
new
Properties
();
properties
.
setProperty
(
ProducerConfig
.
BOOTSTRAP_SERVERS_CONFIG
,
"localhost:9092"
);
properties
.
setProperty
(
ProducerConfig
.
KEY_SERIALIZER_CLASS_CONFIG
,
StringSerializer
.
class
.
getName
());
properties
.
setProperty
(
ProducerConfig
.
VALUE_SERIALIZER_CLASS_CONFIG
,
UserSerializer
.
class
.
getName
());
// Criar o produtor
KafkaProducer
<
String
,
User
>
producer
=
new
KafkaProducer
<>(
properties
);
// Enviar as mensagens
for
(
int
i
=
0
;
i
<
10
;
i
++)
{
String
name
=
"user"
+
i
;
LocalDate
birthday
=
LocalDate
.
now
().
plusDays
(
i
);
User
user
=
new
User
(
"user"
+
i
,
birthday
);
ProducerRecord
<
String
,
User
>
record
=
new
ProducerRecord
<>(
"meu_topico"
,
user
);
producer
.
send
(
record
,
new
Callback
()
{
public
void
onCompletion
(
RecordMetadata
recordMetadata
,
Exception
e
)
{
if
(
e
==
null
)
{
logger
.
info
(
"Exibindo os meta-dados sobre o envio da mensagem. \n"
+
"Topico: "
+
recordMetadata
.
topic
()
+
"\n"
+
"Partição: "
+
recordMetadata
.
partition
()
+
"\n"
+
"Offset"
+
recordMetadata
.
offset
());
}
else
{
logger
.
error
(
"Erro no envio da mensagem"
,
e
);
}
}
});
// Envio assíncrono
}
// Fecha o produtor
producer
.
close
();
}
}
src/main/java/User.java
0 → 100644
View file @
e7e03e1b
import
java.time.LocalDate
;
public
class
User
{
public
String
name
;
public
LocalDate
birthday
;
public
User
(
String
name
,
LocalDate
birthday
)
{
this
.
name
=
name
;
this
.
birthday
=
birthday
;
}
}
src/main/java/UserSerializer.java
0 → 100644
View file @
e7e03e1b
import
com.fasterxml.jackson.core.JsonProcessingException
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
org.apache.kafka.common.serialization.Serializer
;
import
java.util.Map
;
public
class
UserSerializer
implements
Serializer
<
User
>
{
ObjectMapper
mapper
=
new
ObjectMapper
();
@Override
public
byte
[]
serialize
(
String
s
,
User
user
)
{
try
{
return
mapper
.
writeValueAsBytes
(
user
);
}
catch
(
JsonProcessingException
e
)
{
e
.
printStackTrace
();
return
null
;
}
}
@Override
public
void
configure
(
Map
<
String
,
?>
map
,
boolean
b
)
{
}
@Override
public
void
close
()
{
}
}
\ No newline at end of file
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment