init
This commit is contained in:
parent
0a1bc51379
commit
bc88719107
3
.gitignore
vendored
3
.gitignore
vendored
@ -129,3 +129,6 @@ dmypy.json
|
|||||||
# Pyre type checker
|
# Pyre type checker
|
||||||
.pyre/
|
.pyre/
|
||||||
|
|
||||||
|
test
|
||||||
|
.idea
|
||||||
|
|
||||||
|
21
Pipfile
Normal file
21
Pipfile
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
[[source]]
|
||||||
|
url = "https://pypi.douban.com/simple"
|
||||||
|
verify_ssl = false
|
||||||
|
name = "pypi"
|
||||||
|
|
||||||
|
[packages]
|
||||||
|
fastapi = "*"
|
||||||
|
aioredis = "*"
|
||||||
|
kafka-python = "*"
|
||||||
|
uvicorn = "*"
|
||||||
|
requests = "*"
|
||||||
|
pipfile = "*"
|
||||||
|
gunicorn = "*"
|
||||||
|
uvloop = "*"
|
||||||
|
httptools = "*"
|
||||||
|
typing-extensions = "*"
|
||||||
|
|
||||||
|
[dev-packages]
|
||||||
|
|
||||||
|
[requires]
|
||||||
|
python_version = "3.8"
|
264
Pipfile.lock
generated
Normal file
264
Pipfile.lock
generated
Normal file
@ -0,0 +1,264 @@
|
|||||||
|
{
|
||||||
|
"_meta": {
|
||||||
|
"hash": {
|
||||||
|
"sha256": "9999b5b71da727c1482bc015f3bdf684ddc91cb7b48163e72da90d5c5b120726"
|
||||||
|
},
|
||||||
|
"pipfile-spec": 6,
|
||||||
|
"requires": {
|
||||||
|
"python_version": "3.8"
|
||||||
|
},
|
||||||
|
"sources": [
|
||||||
|
{
|
||||||
|
"name": "pypi",
|
||||||
|
"url": "https://pypi.douban.com/simple",
|
||||||
|
"verify_ssl": false
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"default": {
|
||||||
|
"aioredis": {
|
||||||
|
"hashes": [
|
||||||
|
"sha256:15f8af30b044c771aee6787e5ec24694c048184c7b9e54c3b60c750a4b93273a",
|
||||||
|
"sha256:b61808d7e97b7cd5a92ed574937a079c9387fdadd22bfbfa7ad2fd319ecc26e3"
|
||||||
|
],
|
||||||
|
"index": "pypi",
|
||||||
|
"version": "==1.3.1"
|
||||||
|
},
|
||||||
|
"async-timeout": {
|
||||||
|
"hashes": [
|
||||||
|
"sha256:0c3c816a028d47f659d6ff5c745cb2acf1f966da1fe5c19c77a70282b25f4c5f",
|
||||||
|
"sha256:4291ca197d287d274d0b6cb5d6f8f8f82d434ed288f962539ff18cc9012f9ea3"
|
||||||
|
],
|
||||||
|
"markers": "python_full_version >= '3.5.3'",
|
||||||
|
"version": "==3.0.1"
|
||||||
|
},
|
||||||
|
"certifi": {
|
||||||
|
"hashes": [
|
||||||
|
"sha256:1a4995114262bffbc2413b159f2a1a480c969de6e6eb13ee966d470af86af59c",
|
||||||
|
"sha256:719a74fb9e33b9bd44cc7f3a8d94bc35e4049deebe19ba7d8e108280cfd59830"
|
||||||
|
],
|
||||||
|
"version": "==2020.12.5"
|
||||||
|
},
|
||||||
|
"chardet": {
|
||||||
|
"hashes": [
|
||||||
|
"sha256:0d6f53a15db4120f2b08c94f11e7d93d2c911ee118b6b30a04ec3ee8310179fa",
|
||||||
|
"sha256:f864054d66fd9118f2e67044ac8981a54775ec5b67aed0441892edb553d21da5"
|
||||||
|
],
|
||||||
|
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'",
|
||||||
|
"version": "==4.0.0"
|
||||||
|
},
|
||||||
|
"click": {
|
||||||
|
"hashes": [
|
||||||
|
"sha256:d2b5255c7c6349bc1bd1e59e08cd12acbbd63ce649f2588755783aa94dfb6b1a",
|
||||||
|
"sha256:dacca89f4bfadd5de3d7489b7c8a566eee0d3676333fbb50030263894c38c0dc"
|
||||||
|
],
|
||||||
|
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'",
|
||||||
|
"version": "==7.1.2"
|
||||||
|
},
|
||||||
|
"fastapi": {
|
||||||
|
"hashes": [
|
||||||
|
"sha256:63c4592f5ef3edf30afa9a44fa7c6b7ccb20e0d3f68cd9eba07b44d552058dcb",
|
||||||
|
"sha256:98d8ea9591d8512fdadf255d2a8fa56515cdd8624dca4af369da73727409508e"
|
||||||
|
],
|
||||||
|
"index": "pypi",
|
||||||
|
"version": "==0.63.0"
|
||||||
|
},
|
||||||
|
"gunicorn": {
|
||||||
|
"hashes": [
|
||||||
|
"sha256:e0a968b5ba15f8a328fdfd7ab1fcb5af4470c28aaf7e55df02a99bc13138e6e8"
|
||||||
|
],
|
||||||
|
"index": "pypi",
|
||||||
|
"version": "==20.1.0"
|
||||||
|
},
|
||||||
|
"h11": {
|
||||||
|
"hashes": [
|
||||||
|
"sha256:36a3cb8c0a032f56e2da7084577878a035d3b61d104230d4bd49c0c6b555a9c6",
|
||||||
|
"sha256:47222cb6067e4a307d535814917cd98fd0a57b6788ce715755fa2b6c28b56042"
|
||||||
|
],
|
||||||
|
"markers": "python_version >= '3.6'",
|
||||||
|
"version": "==0.12.0"
|
||||||
|
},
|
||||||
|
"hiredis": {
|
||||||
|
"hashes": [
|
||||||
|
"sha256:04026461eae67fdefa1949b7332e488224eac9e8f2b5c58c98b54d29af22093e",
|
||||||
|
"sha256:04927a4c651a0e9ec11c68e4427d917e44ff101f761cd3b5bc76f86aaa431d27",
|
||||||
|
"sha256:07bbf9bdcb82239f319b1f09e8ef4bdfaec50ed7d7ea51a56438f39193271163",
|
||||||
|
"sha256:09004096e953d7ebd508cded79f6b21e05dff5d7361771f59269425108e703bc",
|
||||||
|
"sha256:0adea425b764a08270820531ec2218d0508f8ae15a448568109ffcae050fee26",
|
||||||
|
"sha256:0b39ec237459922c6544d071cdcf92cbb5bc6685a30e7c6d985d8a3e3a75326e",
|
||||||
|
"sha256:0d5109337e1db373a892fdcf78eb145ffb6bbd66bb51989ec36117b9f7f9b579",
|
||||||
|
"sha256:0f41827028901814c709e744060843c77e78a3aca1e0d6875d2562372fcb405a",
|
||||||
|
"sha256:11d119507bb54e81f375e638225a2c057dda748f2b1deef05c2b1a5d42686048",
|
||||||
|
"sha256:1233e303645f468e399ec906b6b48ab7cd8391aae2d08daadbb5cad6ace4bd87",
|
||||||
|
"sha256:139705ce59d94eef2ceae9fd2ad58710b02aee91e7fa0ccb485665ca0ecbec63",
|
||||||
|
"sha256:1f03d4dadd595f7a69a75709bc81902673fa31964c75f93af74feac2f134cc54",
|
||||||
|
"sha256:240ce6dc19835971f38caf94b5738092cb1e641f8150a9ef9251b7825506cb05",
|
||||||
|
"sha256:294a6697dfa41a8cba4c365dd3715abc54d29a86a40ec6405d677ca853307cfb",
|
||||||
|
"sha256:3d55e36715ff06cdc0ab62f9591607c4324297b6b6ce5b58cb9928b3defe30ea",
|
||||||
|
"sha256:3dddf681284fe16d047d3ad37415b2e9ccdc6c8986c8062dbe51ab9a358b50a5",
|
||||||
|
"sha256:3f5f7e3a4ab824e3de1e1700f05ad76ee465f5f11f5db61c4b297ec29e692b2e",
|
||||||
|
"sha256:508999bec4422e646b05c95c598b64bdbef1edf0d2b715450a078ba21b385bcc",
|
||||||
|
"sha256:5d2a48c80cf5a338d58aae3c16872f4d452345e18350143b3bf7216d33ba7b99",
|
||||||
|
"sha256:5dc7a94bb11096bc4bffd41a3c4f2b958257085c01522aa81140c68b8bf1630a",
|
||||||
|
"sha256:65d653df249a2f95673976e4e9dd7ce10de61cfc6e64fa7eeaa6891a9559c581",
|
||||||
|
"sha256:7492af15f71f75ee93d2a618ca53fea8be85e7b625e323315169977fae752426",
|
||||||
|
"sha256:7f0055f1809b911ab347a25d786deff5e10e9cf083c3c3fd2dd04e8612e8d9db",
|
||||||
|
"sha256:807b3096205c7cec861c8803a6738e33ed86c9aae76cac0e19454245a6bbbc0a",
|
||||||
|
"sha256:81d6d8e39695f2c37954d1011c0480ef7cf444d4e3ae24bc5e89ee5de360139a",
|
||||||
|
"sha256:87c7c10d186f1743a8fd6a971ab6525d60abd5d5d200f31e073cd5e94d7e7a9d",
|
||||||
|
"sha256:8b42c0dc927b8d7c0eb59f97e6e34408e53bc489f9f90e66e568f329bff3e443",
|
||||||
|
"sha256:a00514362df15af041cc06e97aebabf2895e0a7c42c83c21894be12b84402d79",
|
||||||
|
"sha256:a39efc3ade8c1fb27c097fd112baf09d7fd70b8cb10ef1de4da6efbe066d381d",
|
||||||
|
"sha256:a4ee8000454ad4486fb9f28b0cab7fa1cd796fc36d639882d0b34109b5b3aec9",
|
||||||
|
"sha256:a7928283143a401e72a4fad43ecc85b35c27ae699cf5d54d39e1e72d97460e1d",
|
||||||
|
"sha256:adf4dd19d8875ac147bf926c727215a0faf21490b22c053db464e0bf0deb0485",
|
||||||
|
"sha256:ae8427a5e9062ba66fc2c62fb19a72276cf12c780e8db2b0956ea909c48acff5",
|
||||||
|
"sha256:b4c8b0bc5841e578d5fb32a16e0c305359b987b850a06964bd5a62739d688048",
|
||||||
|
"sha256:b84f29971f0ad4adaee391c6364e6f780d5aae7e9226d41964b26b49376071d0",
|
||||||
|
"sha256:c39c46d9e44447181cd502a35aad2bb178dbf1b1f86cf4db639d7b9614f837c6",
|
||||||
|
"sha256:cb2126603091902767d96bcb74093bd8b14982f41809f85c9b96e519c7e1dc41",
|
||||||
|
"sha256:dcef843f8de4e2ff5e35e96ec2a4abbdf403bd0f732ead127bd27e51f38ac298",
|
||||||
|
"sha256:e3447d9e074abf0e3cd85aef8131e01ab93f9f0e86654db7ac8a3f73c63706ce",
|
||||||
|
"sha256:f52010e0a44e3d8530437e7da38d11fb822acfb0d5b12e9cd5ba655509937ca0",
|
||||||
|
"sha256:f8196f739092a78e4f6b1b2172679ed3343c39c61a3e9d722ce6fcf1dac2824a"
|
||||||
|
],
|
||||||
|
"markers": "python_version >= '3.6'",
|
||||||
|
"version": "==2.0.0"
|
||||||
|
},
|
||||||
|
"httptools": {
|
||||||
|
"hashes": [
|
||||||
|
"sha256:0a4b1b2012b28e68306575ad14ad5e9120b34fccd02a81eb08838d7e3bbb48be",
|
||||||
|
"sha256:3592e854424ec94bd17dc3e0c96a64e459ec4147e6d53c0a42d0ebcef9cb9c5d",
|
||||||
|
"sha256:41b573cf33f64a8f8f3400d0a7faf48e1888582b6f6e02b82b9bd4f0bf7497ce",
|
||||||
|
"sha256:56b6393c6ac7abe632f2294da53f30d279130a92e8ae39d8d14ee2e1b05ad1f2",
|
||||||
|
"sha256:86c6acd66765a934e8730bf0e9dfaac6fdcf2a4334212bd4a0a1c78f16475ca6",
|
||||||
|
"sha256:96da81e1992be8ac2fd5597bf0283d832287e20cb3cfde8996d2b00356d4e17f",
|
||||||
|
"sha256:96eb359252aeed57ea5c7b3d79839aaa0382c9d3149f7d24dd7172b1bcecb009",
|
||||||
|
"sha256:a2719e1d7a84bb131c4f1e0cb79705034b48de6ae486eb5297a139d6a3296dce",
|
||||||
|
"sha256:ac0aa11e99454b6a66989aa2d44bca41d4e0f968e395a0a8f164b401fefe359a",
|
||||||
|
"sha256:bc3114b9edbca5a1eb7ae7db698c669eb53eb8afbbebdde116c174925260849c",
|
||||||
|
"sha256:fa3cd71e31436911a44620473e873a256851e1f53dee56669dae403ba41756a4",
|
||||||
|
"sha256:fea04e126014169384dee76a153d4573d90d0cbd1d12185da089f73c78390437"
|
||||||
|
],
|
||||||
|
"index": "pypi",
|
||||||
|
"version": "==0.1.1"
|
||||||
|
},
|
||||||
|
"idna": {
|
||||||
|
"hashes": [
|
||||||
|
"sha256:b307872f855b18632ce0c21c5e45be78c0ea7ae4c15c828c20788b26921eb3f6",
|
||||||
|
"sha256:b97d804b1e9b523befed77c48dacec60e6dcb0b5391d57af6a65a312a90648c0"
|
||||||
|
],
|
||||||
|
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'",
|
||||||
|
"version": "==2.10"
|
||||||
|
},
|
||||||
|
"kafka-python": {
|
||||||
|
"hashes": [
|
||||||
|
"sha256:04dfe7fea2b63726cd6f3e79a2d86e709d608d74406638c5da33a01d45a9d7e3",
|
||||||
|
"sha256:2d92418c7cb1c298fa6c7f0fb3519b520d0d7526ac6cb7ae2a4fc65a51a94b6e"
|
||||||
|
],
|
||||||
|
"index": "pypi",
|
||||||
|
"version": "==2.0.2"
|
||||||
|
},
|
||||||
|
"pipfile": {
|
||||||
|
"hashes": [
|
||||||
|
"sha256:f7d9f15de8b660986557eb3cc5391aa1a16207ac41bc378d03f414762d36c984"
|
||||||
|
],
|
||||||
|
"index": "pypi",
|
||||||
|
"version": "==0.0.2"
|
||||||
|
},
|
||||||
|
"pydantic": {
|
||||||
|
"hashes": [
|
||||||
|
"sha256:0c40162796fc8d0aa744875b60e4dc36834db9f2a25dbf9ba9664b1915a23850",
|
||||||
|
"sha256:20d42f1be7c7acc352b3d09b0cf505a9fab9deb93125061b376fbe1f06a5459f",
|
||||||
|
"sha256:2287ebff0018eec3cc69b1d09d4b7cebf277726fa1bd96b45806283c1d808683",
|
||||||
|
"sha256:258576f2d997ee4573469633592e8b99aa13bda182fcc28e875f866016c8e07e",
|
||||||
|
"sha256:26cf3cb2e68ec6c0cfcb6293e69fb3450c5fd1ace87f46b64f678b0d29eac4c3",
|
||||||
|
"sha256:2f2736d9a996b976cfdfe52455ad27462308c9d3d0ae21a2aa8b4cd1a78f47b9",
|
||||||
|
"sha256:3114d74329873af0a0e8004627f5389f3bb27f956b965ddd3e355fe984a1789c",
|
||||||
|
"sha256:3bbd023c981cbe26e6e21c8d2ce78485f85c2e77f7bab5ec15b7d2a1f491918f",
|
||||||
|
"sha256:3bcb9d7e1f9849a6bdbd027aabb3a06414abd6068cb3b21c49427956cce5038a",
|
||||||
|
"sha256:4bbc47cf7925c86a345d03b07086696ed916c7663cb76aa409edaa54546e53e2",
|
||||||
|
"sha256:6388ef4ef1435364c8cc9a8192238aed030595e873d8462447ccef2e17387125",
|
||||||
|
"sha256:830ef1a148012b640186bf4d9789a206c56071ff38f2460a32ae67ca21880eb8",
|
||||||
|
"sha256:8fbb677e4e89c8ab3d450df7b1d9caed23f254072e8597c33279460eeae59b99",
|
||||||
|
"sha256:c17a0b35c854049e67c68b48d55e026c84f35593c66d69b278b8b49e2484346f",
|
||||||
|
"sha256:dd4888b300769ecec194ca8f2699415f5f7760365ddbe243d4fd6581485fa5f0",
|
||||||
|
"sha256:dde4ca368e82791de97c2ec019681ffb437728090c0ff0c3852708cf923e0c7d",
|
||||||
|
"sha256:e3f8790c47ac42549dc8b045a67b0ca371c7f66e73040d0197ce6172b385e520",
|
||||||
|
"sha256:e8bc082afef97c5fd3903d05c6f7bb3a6af9fc18631b4cc9fedeb4720efb0c58",
|
||||||
|
"sha256:eb8ccf12295113ce0de38f80b25f736d62f0a8d87c6b88aca645f168f9c78771",
|
||||||
|
"sha256:fb77f7a7e111db1832ae3f8f44203691e15b1fa7e5a1cb9691d4e2659aee41c4",
|
||||||
|
"sha256:fbfb608febde1afd4743c6822c19060a8dbdd3eb30f98e36061ba4973308059e",
|
||||||
|
"sha256:fff29fe54ec419338c522b908154a2efabeee4f483e48990f87e189661f31ce3"
|
||||||
|
],
|
||||||
|
"markers": "python_full_version >= '3.6.1'",
|
||||||
|
"version": "==1.8.1"
|
||||||
|
},
|
||||||
|
"requests": {
|
||||||
|
"hashes": [
|
||||||
|
"sha256:27973dd4a904a4f13b263a19c866c13b92a39ed1c964655f025f3f8d3d75b804",
|
||||||
|
"sha256:c210084e36a42ae6b9219e00e48287def368a26d03a048ddad7bfee44f75871e"
|
||||||
|
],
|
||||||
|
"index": "pypi",
|
||||||
|
"version": "==2.25.1"
|
||||||
|
},
|
||||||
|
"starlette": {
|
||||||
|
"hashes": [
|
||||||
|
"sha256:bd2ffe5e37fb75d014728511f8e68ebf2c80b0fa3d04ca1479f4dc752ae31ac9",
|
||||||
|
"sha256:ebe8ee08d9be96a3c9f31b2cb2a24dbdf845247b745664bd8a3f9bd0c977fdbc"
|
||||||
|
],
|
||||||
|
"markers": "python_version >= '3.6'",
|
||||||
|
"version": "==0.13.6"
|
||||||
|
},
|
||||||
|
"toml": {
|
||||||
|
"hashes": [
|
||||||
|
"sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b",
|
||||||
|
"sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f"
|
||||||
|
],
|
||||||
|
"markers": "python_version >= '2.6' and python_version not in '3.0, 3.1, 3.2, 3.3'",
|
||||||
|
"version": "==0.10.2"
|
||||||
|
},
|
||||||
|
"typing-extensions": {
|
||||||
|
"hashes": [
|
||||||
|
"sha256:7cb407020f00f7bfc3cb3e7881628838e69d8f3fcab2f64742a5e76b2f841918",
|
||||||
|
"sha256:99d4073b617d30288f569d3f13d2bd7548c3a7e4c8de87db09a9d29bb3a4a60c",
|
||||||
|
"sha256:dafc7639cde7f1b6e1acc0f457842a83e722ccca8eef5270af2d74792619a89f"
|
||||||
|
],
|
||||||
|
"index": "pypi",
|
||||||
|
"version": "==3.7.4.3"
|
||||||
|
},
|
||||||
|
"urllib3": {
|
||||||
|
"hashes": [
|
||||||
|
"sha256:2f4da4594db7e1e110a944bb1b551fdf4e6c136ad42e4234131391e21eb5b0df",
|
||||||
|
"sha256:e7b021f7241115872f92f43c6508082facffbd1c048e3c6e2bb9c2a157e28937"
|
||||||
|
],
|
||||||
|
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4' and python_version < '4'",
|
||||||
|
"version": "==1.26.4"
|
||||||
|
},
|
||||||
|
"uvicorn": {
|
||||||
|
"hashes": [
|
||||||
|
"sha256:3292251b3c7978e8e4a7868f4baf7f7f7bb7e40c759ecc125c37e99cdea34202",
|
||||||
|
"sha256:7587f7b08bd1efd2b9bad809a3d333e972f1d11af8a5e52a9371ee3a5de71524"
|
||||||
|
],
|
||||||
|
"index": "pypi",
|
||||||
|
"version": "==0.13.4"
|
||||||
|
},
|
||||||
|
"uvloop": {
|
||||||
|
"hashes": [
|
||||||
|
"sha256:114543c84e95df1b4ff546e6e3a27521580466a30127f12172a3278172ad68bc",
|
||||||
|
"sha256:19fa1d56c91341318ac5d417e7b61c56e9a41183946cc70c411341173de02c69",
|
||||||
|
"sha256:2bb0624a8a70834e54dde8feed62ed63b50bad7a1265c40d6403a2ac447bce01",
|
||||||
|
"sha256:42eda9f525a208fbc4f7cecd00fa15c57cc57646c76632b3ba2fe005004f051d",
|
||||||
|
"sha256:44cac8575bf168601424302045234d74e3561fbdbac39b2b54cc1d1d00b70760",
|
||||||
|
"sha256:6de130d0cb78985a5d080e323b86c5ecaf3af82f4890492c05981707852f983c",
|
||||||
|
"sha256:7ae39b11a5f4cec1432d706c21ecc62f9e04d116883178b09671aa29c46f7a47",
|
||||||
|
"sha256:90e56f17755e41b425ad19a08c41dc358fa7bf1226c0f8e54d4d02d556f7af7c",
|
||||||
|
"sha256:b45218c99795803fb8bdbc9435ff7f54e3a591b44cd4c121b02fa83affb61c7c",
|
||||||
|
"sha256:e5e5f855c9bf483ee6cd1eb9a179b740de80cb0ae2988e3fa22309b78e2ea0e7"
|
||||||
|
],
|
||||||
|
"index": "pypi",
|
||||||
|
"version": "==0.15.2"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"develop": {}
|
||||||
|
}
|
2
handler_data/__init__.py
Normal file
2
handler_data/__init__.py
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
from .handler_user import HandlerUser
|
||||||
|
from .handler_event import HandlerEvent
|
7
handler_data/handler_event.py
Normal file
7
handler_data/handler_event.py
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
class HandlerEvent:
|
||||||
|
handler_link = []
|
||||||
|
|
||||||
|
def __init__(self, func):
|
||||||
|
HandlerEvent.handler_link.append(func)
|
||||||
|
|
||||||
|
|
20
handler_data/handler_user.py
Normal file
20
handler_data/handler_user.py
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
class HandlerUser:
|
||||||
|
handler_link = []
|
||||||
|
|
||||||
|
def __init__(self, func):
|
||||||
|
HandlerUser.handler_link.append(func)
|
||||||
|
|
||||||
|
|
||||||
|
@HandlerUser
|
||||||
|
async def device_label(rdb, item):
|
||||||
|
"""
|
||||||
|
标记新设备
|
||||||
|
:param rdb:
|
||||||
|
:param item:
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
v = await rdb.execute('sadd', f'{item.game}.devices', item.properties.get('#device_id', ''))
|
||||||
|
if v:
|
||||||
|
item.properties['is_new_device'] = 1
|
||||||
|
else:
|
||||||
|
item.properties['is_new_device'] = 0
|
35
main.py
Normal file
35
main.py
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
import uvicorn
|
||||||
|
from aioredis import create_redis_pool
|
||||||
|
from fastapi import FastAPI
|
||||||
|
|
||||||
|
from routers import point
|
||||||
|
from settings import settings
|
||||||
|
|
||||||
|
from utils.ta_sdk import TGAnalytics, ToKafka
|
||||||
|
|
||||||
|
app = FastAPI()
|
||||||
|
|
||||||
|
|
||||||
|
def register_redis(app: FastAPI) -> None:
|
||||||
|
@app.on_event('startup')
|
||||||
|
async def startup_event():
|
||||||
|
app.state.redis = await create_redis_pool(**settings.REDIS_CONF)
|
||||||
|
|
||||||
|
@app.on_event('shutdown')
|
||||||
|
async def shutdown_event():
|
||||||
|
app.state.redis.close()
|
||||||
|
await app.state.redis.wait_closed()
|
||||||
|
|
||||||
|
|
||||||
|
def register_ta(app: FastAPI) -> None:
|
||||||
|
@app.on_event('startup')
|
||||||
|
def startup_event():
|
||||||
|
app.state.ta = TGAnalytics(ToKafka(settings.KAFKA_CONF))
|
||||||
|
|
||||||
|
|
||||||
|
app.include_router(point.router, prefix='/v1')
|
||||||
|
register_redis(app)
|
||||||
|
register_ta(app)
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
uvicorn.run(app='main:app', host="0.0.0.0", port=6666, reload=True, debug=True)
|
0
routers/__init__.py
Normal file
0
routers/__init__.py
Normal file
57
routers/point.py
Normal file
57
routers/point.py
Normal file
@ -0,0 +1,57 @@
|
|||||||
|
import asyncio
|
||||||
|
import hashlib
|
||||||
|
|
||||||
|
from fastapi import APIRouter, Request
|
||||||
|
from pydantic import BaseModel, validator
|
||||||
|
|
||||||
|
from handler_data import HandlerUser, HandlerEvent
|
||||||
|
from settings import settings
|
||||||
|
|
||||||
|
router = APIRouter()
|
||||||
|
|
||||||
|
|
||||||
|
class Item(BaseModel):
|
||||||
|
# sign = md5(distinct_id+account_id+act+ts+salt)
|
||||||
|
distinct_id: str
|
||||||
|
game: str
|
||||||
|
account_id: str
|
||||||
|
act: str
|
||||||
|
event_name: str = None
|
||||||
|
properties: dict
|
||||||
|
ts: int
|
||||||
|
sign: str
|
||||||
|
|
||||||
|
@validator('sign')
|
||||||
|
def sign_validator(cls, v: str, values: dict):
|
||||||
|
s = f'{values.get("distinct_id")}{values.get("account_id", "")}{values.get("act", "")}{values.get("ts", "")}{settings.SALT}'
|
||||||
|
if hashlib.md5(s.encode()).hexdigest() == v:
|
||||||
|
return v
|
||||||
|
raise ValueError(f'打击违法犯罪行为{hashlib.md5(s.encode()).hexdigest()}')
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/point/")
|
||||||
|
async def point(request: Request, item: Item):
|
||||||
|
ta = getattr(request.app.state.ta, item.act)
|
||||||
|
# 将不同游戏发送到不同 topic_name
|
||||||
|
request.app.state.ta.consumer.topic_name = item.game
|
||||||
|
|
||||||
|
rdb = request.app.state.redis
|
||||||
|
if ta and item.event_name and item.act == 'track':
|
||||||
|
await asyncio.gather(*map(lambda o: asyncio.create_task(o(rdb, item)), HandlerEvent.handler_link))
|
||||||
|
await track(ta, item)
|
||||||
|
else:
|
||||||
|
await asyncio.gather(*map(lambda o: asyncio.create_task(o(rdb, item)), HandlerUser.handler_link))
|
||||||
|
await user_set(ta, item)
|
||||||
|
results = {"code": 0, 'msg': 'ok'}
|
||||||
|
return results
|
||||||
|
|
||||||
|
|
||||||
|
async def track(ta, item):
|
||||||
|
ta(item.distinct_id, item.account_id, item.event_name, item.properties)
|
||||||
|
|
||||||
|
|
||||||
|
async def user_set(ta, item):
|
||||||
|
ta(item.distinct_id, item.account_id, item.properties)
|
||||||
|
|
||||||
|
|
||||||
|
|
25
settings.py
Normal file
25
settings.py
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
import json
|
||||||
|
|
||||||
|
|
||||||
|
class Config:
|
||||||
|
REDIS_CONF = {
|
||||||
|
'address': ('192.168.0.161', 6379),
|
||||||
|
'password': 'd1Gh*zp5',
|
||||||
|
'db': 1
|
||||||
|
}
|
||||||
|
SALT = '0r4X00mH'
|
||||||
|
KAFKA_CONF = {
|
||||||
|
'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"],
|
||||||
|
'value_serializer': lambda v: json.dumps(v).encode('utf-8'),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class Debug(Config):
|
||||||
|
REDIS_CONF = {
|
||||||
|
'address': ('192.168.0.161', 6379),
|
||||||
|
'password': 'd1Gh*zp5',
|
||||||
|
'db': 0
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
settings = Debug
|
0
utils/__init__.py
Normal file
0
utils/__init__.py
Normal file
930
utils/ta_sdk.py
Normal file
930
utils/ta_sdk.py
Normal file
@ -0,0 +1,930 @@
|
|||||||
|
# encoding:utf-8
|
||||||
|
|
||||||
|
from __future__ import unicode_literals
|
||||||
|
|
||||||
|
import datetime
|
||||||
|
import gzip
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import random
|
||||||
|
import re
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
import requests
|
||||||
|
from kafka import KafkaProducer
|
||||||
|
from requests import ConnectionError
|
||||||
|
|
||||||
|
try:
|
||||||
|
import queue
|
||||||
|
from urllib.parse import urlparse
|
||||||
|
except ImportError:
|
||||||
|
import Queue as queue
|
||||||
|
from urlparse import urlparse
|
||||||
|
try:
|
||||||
|
isinstance("", basestring)
|
||||||
|
|
||||||
|
|
||||||
|
def is_str(s):
|
||||||
|
return isinstance(s, basestring)
|
||||||
|
except NameError:
|
||||||
|
def is_str(s):
|
||||||
|
return isinstance(s, str)
|
||||||
|
try:
|
||||||
|
isinstance(1, long)
|
||||||
|
|
||||||
|
|
||||||
|
def is_int(n):
|
||||||
|
return isinstance(n, int) or isinstance(n, long)
|
||||||
|
except NameError:
|
||||||
|
def is_int(n):
|
||||||
|
return isinstance(n, int)
|
||||||
|
|
||||||
|
try:
|
||||||
|
from enum import Enum
|
||||||
|
|
||||||
|
ROTATE_MODE = Enum('ROTATE_MODE', ('DAILY', 'HOURLY'))
|
||||||
|
except ImportError:
|
||||||
|
class ROTATE_MODE(object):
|
||||||
|
DAILY = 0
|
||||||
|
HOURLY = 1
|
||||||
|
|
||||||
|
|
||||||
|
class TGAException(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class TGAIllegalDataException(TGAException):
|
||||||
|
"""数据格式异常
|
||||||
|
|
||||||
|
在发送的数据格式有误时,SDK 会抛出此异常,用户应当捕获并处理.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class TGANetworkException(TGAException):
|
||||||
|
"""网络异常
|
||||||
|
|
||||||
|
在因为网络或者不可预知的问题导致数据无法发送时,SDK会抛出此异常,用户应当捕获并处理.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
__version__ = '1.6.0'
|
||||||
|
|
||||||
|
|
||||||
|
class TGAnalytics(object):
|
||||||
|
"""TGAnalytics 实例是发送事件数据和用户属性数据的关键实例
|
||||||
|
"""
|
||||||
|
|
||||||
|
__NAME_PATTERN = re.compile(r"^(#[a-z][a-z0-9_]{0,49})|([a-z][a-z0-9_]{0,50})$", re.I)
|
||||||
|
|
||||||
|
def __init__(self, consumer, enable_uuid=False):
|
||||||
|
"""创建一个 TGAnalytics 实例
|
||||||
|
|
||||||
|
TGAanlytics 需要与指定的 Consumer 一起使用,可以使用以下任何一种:
|
||||||
|
- LoggingConsumer: 批量实时写本地文件,并与 LogBus 搭配
|
||||||
|
- BatchConsumer: 批量实时地向TA服务器传输数据(同步阻塞),不需要搭配传输工具
|
||||||
|
- AsyncBatchConsumer: 批量实时地向TA服务器传输数据(异步非阻塞),不需要搭配传输工具
|
||||||
|
- DebugConsumer: 逐条发送数据,并对数据格式做严格校验
|
||||||
|
|
||||||
|
Args:
|
||||||
|
consumer: 指定的 Consumer
|
||||||
|
"""
|
||||||
|
self.__consumer = consumer
|
||||||
|
self.__enableUuid = enable_uuid
|
||||||
|
self.__super_properties = {}
|
||||||
|
self.clear_super_properties()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def consumer(self):
|
||||||
|
"""
|
||||||
|
用了更换 kafka topic_name
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
return self.__consumer
|
||||||
|
|
||||||
|
def user_set(self, distinct_id=None, account_id=None, properties=None):
|
||||||
|
"""设置用户属性
|
||||||
|
|
||||||
|
对于一般的用户属性,您可以调用 user_set 来进行设置。使用该接口上传的属性将会覆盖原有的属性值,如果之前不存在该用户属性,
|
||||||
|
则会新建该用户属性,类型与传入属性的类型一致.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
distinct_id: 访客 ID
|
||||||
|
account_id: 账户 ID
|
||||||
|
properties: dict 类型的用户属性
|
||||||
|
"""
|
||||||
|
self.__add(distinct_id=distinct_id, account_id=account_id, send_type='user_set', properties_add=properties)
|
||||||
|
|
||||||
|
def user_unset(self, distinct_id=None, account_id=None, properties=None):
|
||||||
|
"""
|
||||||
|
删除某个用户的用户属性
|
||||||
|
:param distinct_id:
|
||||||
|
:param account_id:
|
||||||
|
:param properties:
|
||||||
|
"""
|
||||||
|
if isinstance(properties, list):
|
||||||
|
properties = dict((key, 0) for key in properties)
|
||||||
|
self.__add(distinct_id=distinct_id, account_id=account_id, send_type='user_unset', properties_add=properties)
|
||||||
|
|
||||||
|
def user_setOnce(self, distinct_id=None, account_id=None, properties=None):
|
||||||
|
"""设置用户属性, 不覆盖已存在的用户属性
|
||||||
|
|
||||||
|
如果您要上传的用户属性只要设置一次,则可以调用 user_setOnce 来进行设置,当该属性之前已经有值的时候,将会忽略这条信息.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
distinct_id: 访客 ID
|
||||||
|
account_id: 账户 ID
|
||||||
|
properties: dict 类型的用户属性
|
||||||
|
"""
|
||||||
|
self.__add(distinct_id=distinct_id, account_id=account_id, send_type='user_setOnce', properties_add=properties)
|
||||||
|
|
||||||
|
def user_add(self, distinct_id=None, account_id=None, properties=None):
|
||||||
|
"""对指定的数值类型的用户属性进行累加操作
|
||||||
|
|
||||||
|
当您要上传数值型的属性时,您可以调用 user_add 来对该属性进行累加操作. 如果该属性还未被设置,则会赋值0后再进行计算.
|
||||||
|
可传入负值,等同于相减操作.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
distinct_id: 访客 ID
|
||||||
|
account_id: 账户 ID
|
||||||
|
properties: 数值类型的用户属性
|
||||||
|
"""
|
||||||
|
self.__add(distinct_id=distinct_id, account_id=account_id, send_type='user_add', properties_add=properties)
|
||||||
|
|
||||||
|
def user_append(self, distinct_id=None, account_id=None, properties=None):
|
||||||
|
"""追加一个用户的某一个或者多个集合类型
|
||||||
|
Args:
|
||||||
|
distinct_id: 访客 ID
|
||||||
|
account_id: 账户 ID
|
||||||
|
properties: 集合
|
||||||
|
"""
|
||||||
|
self.__add(distinct_id=distinct_id, account_id=account_id, send_type='user_append', properties_add=properties)
|
||||||
|
|
||||||
|
def user_del(self, distinct_id=None, account_id=None):
|
||||||
|
"""删除用户
|
||||||
|
|
||||||
|
如果您要删除某个用户,可以调用 user_del 将该用户删除。调用此函数后,将无法再查询该用户的用户属性, 但该用户产生的事件仍然可以被查询到.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
distinct_id: 访客 ID
|
||||||
|
account_id: 账户 ID
|
||||||
|
"""
|
||||||
|
self.__add(distinct_id=distinct_id, account_id=account_id, send_type='user_del')
|
||||||
|
|
||||||
|
def track(self, distinct_id=None, account_id=None, event_name=None, properties=None):
|
||||||
|
"""发送事件数据
|
||||||
|
|
||||||
|
您可以调用 track 来上传事件,建议您根据先前梳理的文档来设置事件的属性以及发送信息的条件. 事件的名称只能以字母开头,可包含数字,字母和下划线“_”,
|
||||||
|
长度最大为 50 个字符,对字母大小写不敏感. 事件的属性是一个 dict 对象,其中每个元素代表一个属性.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
distinct_id: 访客 ID
|
||||||
|
account_id: 账户 ID
|
||||||
|
event_name: 事件名称
|
||||||
|
properties: 事件属性
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
TGAIllegalDataException: 数据格式错误时会抛出此异常
|
||||||
|
"""
|
||||||
|
all_properties = self._public_track_add(event_name)
|
||||||
|
|
||||||
|
if properties:
|
||||||
|
all_properties.update(properties)
|
||||||
|
|
||||||
|
self.__add(distinct_id=distinct_id, account_id=account_id, send_type='track', event_name=event_name,
|
||||||
|
properties_add=all_properties)
|
||||||
|
|
||||||
|
def track_update(self, distinct_id=None, account_id=None, event_name=None, event_id=None, properties=None):
|
||||||
|
"""发送可更新的事件数据
|
||||||
|
|
||||||
|
您可以调用 track_update 来上传可更新的事件,建议您根据先前梳理的文档来设置事件的属性以及发送信息的条件. 事件的名称只能以字母开头,可包含数字,字母和下划线“_”,
|
||||||
|
长度最大为 50 个字符,对字母大小写不敏感. 事件的属性是一个 dict 对象,其中每个元素代表一个属性.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
distinct_id: 访客 ID
|
||||||
|
account_id: 账户 ID
|
||||||
|
event_name: 事件名称
|
||||||
|
event_id: 事件唯一ID
|
||||||
|
properties: 事件属性
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
TGAIllegalDataException: 数据格式错误时会抛出此异常
|
||||||
|
"""
|
||||||
|
all_properties = self._public_track_add(event_name)
|
||||||
|
|
||||||
|
if properties:
|
||||||
|
all_properties.update(properties)
|
||||||
|
|
||||||
|
self.__add(distinct_id=distinct_id, account_id=account_id, send_type='track_update', event_name=event_name,
|
||||||
|
event_id=event_id, properties_add=all_properties)
|
||||||
|
|
||||||
|
def track_overwrite(self, distinct_id=None, account_id=None, event_name=None, event_id=None, properties=None):
|
||||||
|
"""发送可覆盖的事件数据
|
||||||
|
|
||||||
|
您可以调用 track_overwrite 来上传可全部覆盖的事件,建议您根据先前梳理的文档来设置事件的属性以及发送信息的条件. 事件的名称只能以字母开头,可包含数字,字母和下划线“_”,
|
||||||
|
长度最大为 50 个字符,对字母大小写不敏感. 事件的属性是一个 dict 对象,其中每个元素代表一个属性.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
distinct_id: 访客 ID
|
||||||
|
account_id: 账户 ID
|
||||||
|
event_name: 事件名称
|
||||||
|
event_id: 事件唯一ID
|
||||||
|
properties: 事件属性
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
TGAIllegalDataException: 数据格式错误时会抛出此异常
|
||||||
|
"""
|
||||||
|
all_properties = self._public_track_add(event_name)
|
||||||
|
|
||||||
|
if properties:
|
||||||
|
all_properties.update(properties)
|
||||||
|
|
||||||
|
self.__add(distinct_id=distinct_id, account_id=account_id, send_type='track_overwrite', event_name=event_name,
|
||||||
|
event_id=event_id, properties_add=all_properties)
|
||||||
|
|
||||||
|
def flush(self):
|
||||||
|
"""立即提交数据到相应的接收端
|
||||||
|
"""
|
||||||
|
self.__consumer.flush()
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
"""关闭并退出 sdk
|
||||||
|
|
||||||
|
请在退出前调用本接口,以避免缓存内的数据丢失
|
||||||
|
"""
|
||||||
|
self.__consumer.close()
|
||||||
|
|
||||||
|
def _public_track_add(self, event_name):
|
||||||
|
if not is_str(event_name):
|
||||||
|
raise TGAIllegalDataException('a string type event_name is required for track')
|
||||||
|
|
||||||
|
all_properties = {
|
||||||
|
'#lib': 'tga_python_sdk',
|
||||||
|
'#lib_version': __version__,
|
||||||
|
}
|
||||||
|
all_properties.update(self.__super_properties)
|
||||||
|
return all_properties
|
||||||
|
pass
|
||||||
|
|
||||||
|
def __add(self, distinct_id, account_id, send_type, event_name=None, event_id=None, properties_add=None):
|
||||||
|
if distinct_id is None and account_id is None:
|
||||||
|
raise TGAException("Distinct_id and account_id must be set at least one")
|
||||||
|
|
||||||
|
if properties_add:
|
||||||
|
properties = properties_add.copy()
|
||||||
|
else:
|
||||||
|
properties = {}
|
||||||
|
data = {
|
||||||
|
'#type': send_type
|
||||||
|
}
|
||||||
|
if "#ip" in properties.keys():
|
||||||
|
data['#ip'] = properties.get("#ip")
|
||||||
|
del (properties['#ip'])
|
||||||
|
if "#first_check_id" in properties.keys():
|
||||||
|
data['#first_check_id'] = properties.get("#first_check_id")
|
||||||
|
del (properties['#first_check_id'])
|
||||||
|
# 只支持UUID标准格式xxxxxxxx - xxxx - xxxx - xxxx - xxxxxxxxxxxx
|
||||||
|
if "#uuid" in properties.keys():
|
||||||
|
data['#uuid'] = str(properties['#uuid'])
|
||||||
|
del (properties['#uuid'])
|
||||||
|
elif self.__enableUuid:
|
||||||
|
data['#uuid'] = str(uuid.uuid1())
|
||||||
|
if "#app_id" in properties.keys():
|
||||||
|
data['#app_id'] = properties.get("#app_id")
|
||||||
|
del (properties['#app_id'])
|
||||||
|
|
||||||
|
self.__assert_properties(send_type, properties)
|
||||||
|
td_time = properties.get("#time")
|
||||||
|
data['#time'] = td_time
|
||||||
|
del (properties['#time'])
|
||||||
|
|
||||||
|
data['properties'] = properties
|
||||||
|
|
||||||
|
if event_name is not None:
|
||||||
|
data['#event_name'] = event_name
|
||||||
|
if event_id is not None:
|
||||||
|
data['#event_id'] = event_id
|
||||||
|
if distinct_id is not None:
|
||||||
|
data['#distinct_id'] = distinct_id
|
||||||
|
if account_id is not None:
|
||||||
|
data['#account_id'] = account_id
|
||||||
|
|
||||||
|
self.__consumer.add(json.dumps(data))
|
||||||
|
|
||||||
|
def __assert_properties(self, action_type, properties):
|
||||||
|
if properties is not None:
|
||||||
|
if "#time" not in properties.keys():
|
||||||
|
properties['#time'] = datetime.datetime.now()
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
time_temp = properties.get('#time')
|
||||||
|
if isinstance(time_temp, datetime.datetime) or isinstance(time_temp, datetime.date):
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
raise TGAIllegalDataException('Value of #time should be datetime.datetime or datetime.date')
|
||||||
|
except Exception as e:
|
||||||
|
raise TGAIllegalDataException(e)
|
||||||
|
|
||||||
|
for key, value in properties.items():
|
||||||
|
if not is_str(key):
|
||||||
|
raise TGAIllegalDataException("Property key must be a str. [key=%s]" % str(key))
|
||||||
|
|
||||||
|
if value is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not self.__NAME_PATTERN.match(key):
|
||||||
|
raise TGAIllegalDataException(
|
||||||
|
"type[%s] property key must be a valid variable name. [key=%s]" % (action_type, str(key)))
|
||||||
|
|
||||||
|
if not is_str(value) and not is_int(value) and not isinstance(value, float) \
|
||||||
|
and not isinstance(value, bool) \
|
||||||
|
and not isinstance(value, datetime.datetime) and not isinstance(value, datetime.date) \
|
||||||
|
and not isinstance(value, list):
|
||||||
|
raise TGAIllegalDataException(
|
||||||
|
"property value must be a str/int/float/bool/datetime/date/list. [value=%s]" % type(value))
|
||||||
|
|
||||||
|
if 'user_add' == action_type.lower() and not self.__number(value) and not key.startswith('#'):
|
||||||
|
raise TGAIllegalDataException('user_add properties must be number type')
|
||||||
|
|
||||||
|
if isinstance(value, datetime.datetime):
|
||||||
|
properties[key] = value.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
|
||||||
|
elif isinstance(value, datetime.date):
|
||||||
|
properties[key] = value.strftime('%Y-%m-%d')
|
||||||
|
if isinstance(value, list):
|
||||||
|
i = 0
|
||||||
|
for lvalue in value:
|
||||||
|
if isinstance(lvalue, datetime.datetime):
|
||||||
|
value[i] = lvalue.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
|
||||||
|
i += 1
|
||||||
|
|
||||||
|
def __number(self, s):
|
||||||
|
if is_int(s):
|
||||||
|
return True
|
||||||
|
if isinstance(s, float):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def clear_super_properties(self):
|
||||||
|
"""删除所有已设置的事件公共属性
|
||||||
|
"""
|
||||||
|
self.__super_properties = {
|
||||||
|
'#lib': 'tga_python_sdk',
|
||||||
|
'#lib_version': __version__,
|
||||||
|
}
|
||||||
|
|
||||||
|
def set_super_properties(self, super_properties):
|
||||||
|
"""设置公共事件属性
|
||||||
|
|
||||||
|
公共事件属性是所有事件中的属性属性,建议您在发送事件前,先设置公共事件属性. 当 track 的 properties 和
|
||||||
|
super properties 有相同的 key 时,track 的 properties 会覆盖公共事件属性的值.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
super_properties 公共属性
|
||||||
|
"""
|
||||||
|
self.__super_properties.update(super_properties)
|
||||||
|
|
||||||
|
|
||||||
|
if os.name == 'nt':
|
||||||
|
import msvcrt
|
||||||
|
|
||||||
|
|
||||||
|
def _lock(file_):
|
||||||
|
try:
|
||||||
|
save_pos = file_.tell()
|
||||||
|
file_.seek(0)
|
||||||
|
try:
|
||||||
|
msvcrt.locking(file_.fileno(), msvcrt.LK_LOCK, 1)
|
||||||
|
except IOError as e:
|
||||||
|
raise TGAException(e)
|
||||||
|
finally:
|
||||||
|
if save_pos:
|
||||||
|
file_.seek(save_pos)
|
||||||
|
except IOError as e:
|
||||||
|
raise TGAException(e)
|
||||||
|
|
||||||
|
|
||||||
|
def _unlock(file_):
|
||||||
|
try:
|
||||||
|
save_pos = file_.tell()
|
||||||
|
if save_pos:
|
||||||
|
file_.seek(0)
|
||||||
|
try:
|
||||||
|
msvcrt.locking(file_.fileno(), msvcrt.LK_UNLCK, 1)
|
||||||
|
except IOError as e:
|
||||||
|
raise TGAException(e)
|
||||||
|
finally:
|
||||||
|
if save_pos:
|
||||||
|
file_.seek(save_pos)
|
||||||
|
except IOError as e:
|
||||||
|
raise TGAException(e)
|
||||||
|
elif os.name == 'posix':
|
||||||
|
import fcntl
|
||||||
|
|
||||||
|
|
||||||
|
def _lock(file_):
|
||||||
|
try:
|
||||||
|
fcntl.flock(file_.fileno(), fcntl.LOCK_EX)
|
||||||
|
except IOError as e:
|
||||||
|
raise TGAException(e)
|
||||||
|
|
||||||
|
|
||||||
|
def _unlock(file_):
|
||||||
|
fcntl.flock(file_.fileno(), fcntl.LOCK_UN)
|
||||||
|
else:
|
||||||
|
raise TGAException("Python SDK is defined for NT and POSIX system.")
|
||||||
|
|
||||||
|
|
||||||
|
class _TAFileLock(object):
|
||||||
|
def __init__(self, file_handler):
|
||||||
|
self._file_handler = file_handler
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
_lock(self._file_handler)
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, t, v, tb):
|
||||||
|
_unlock(self._file_handler)
|
||||||
|
|
||||||
|
|
||||||
|
class LoggingConsumer(object):
|
||||||
|
"""数据批量实时写入本地文件
|
||||||
|
|
||||||
|
创建指定文件存放目录的 LoggingConsumer, 将数据使用 logging 库输出到指定路径. 同时,需将 LogBus 的监听文件夹地址
|
||||||
|
设置为此处的地址,即可使用LogBus进行数据的监听上传.
|
||||||
|
"""
|
||||||
|
_mutex = queue.Queue()
|
||||||
|
_mutex.put(1)
|
||||||
|
|
||||||
|
class _FileWriter(object):
|
||||||
|
_writers = {}
|
||||||
|
_writeMutex = queue.Queue()
|
||||||
|
_writeMutex.put(1)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def instance(cls, filename):
|
||||||
|
cls._writeMutex.get(block=True, timeout=None)
|
||||||
|
try:
|
||||||
|
if filename in cls._writers.keys():
|
||||||
|
result = cls._writers[filename]
|
||||||
|
result._count = result._count + 1
|
||||||
|
else:
|
||||||
|
result = cls(filename)
|
||||||
|
cls._writers[filename] = result
|
||||||
|
return result
|
||||||
|
finally:
|
||||||
|
cls._writeMutex.put(1)
|
||||||
|
|
||||||
|
def __init__(self, filename):
|
||||||
|
self._filename = filename
|
||||||
|
self._file = open(self._filename, 'a')
|
||||||
|
self._count = 1
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
LoggingConsumer._FileWriter._writeMutex.get(block=True, timeout=None)
|
||||||
|
try:
|
||||||
|
self._count = self._count - 1
|
||||||
|
if self._count == 0:
|
||||||
|
self._file.close()
|
||||||
|
del LoggingConsumer._FileWriter._writers[self._filename]
|
||||||
|
finally:
|
||||||
|
LoggingConsumer._FileWriter._writeMutex.put(1)
|
||||||
|
|
||||||
|
def is_valid(self, filename):
|
||||||
|
return self._filename == filename
|
||||||
|
|
||||||
|
def write(self, messages):
|
||||||
|
with _TAFileLock(self._file):
|
||||||
|
for message in messages:
|
||||||
|
self._file.write(message)
|
||||||
|
self._file.write('\n')
|
||||||
|
self._file.flush()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def construct_filename(cls, directory, date_suffix, file_size, file_prefix):
|
||||||
|
filename = file_prefix + ".log." + date_suffix \
|
||||||
|
if file_prefix is not None else "log." + date_suffix
|
||||||
|
|
||||||
|
if file_size > 0:
|
||||||
|
count = 0
|
||||||
|
file_path = directory + filename + "_" + str(count)
|
||||||
|
while os.path.exists(file_path) and cls.file_size_out(file_path, file_size):
|
||||||
|
count = count + 1
|
||||||
|
file_path = directory + filename + "_" + str(count)
|
||||||
|
return file_path
|
||||||
|
else:
|
||||||
|
return directory + filename
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def file_size_out(cls, file_path, file_size):
|
||||||
|
fsize = os.path.getsize(file_path)
|
||||||
|
fsize = fsize / float(1024 * 1024)
|
||||||
|
if fsize >= file_size:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def unlock_logging_consumer(cls):
|
||||||
|
cls._mutex.put(1)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def lock_logging_consumer(cls):
|
||||||
|
cls._mutex.get(block=True, timeout=None)
|
||||||
|
|
||||||
|
def __init__(self, log_directory, log_size=0, buffer_size=8192, rotate_mode=ROTATE_MODE.DAILY, file_prefix=None):
|
||||||
|
"""创建指定日志文件目录的 LoggingConsumer
|
||||||
|
|
||||||
|
Args:
|
||||||
|
log_directory: 日志保存目录
|
||||||
|
log_size: 单个日志文件的大小, 单位 MB, log_size <= 0 表示不限制单个文件大小
|
||||||
|
buffer_size: 每次写入文件的大小, 单位 Byte, 默认 8K
|
||||||
|
rotate_mode: 日志切分模式,默认按天切分
|
||||||
|
"""
|
||||||
|
if not os.path.exists(log_directory):
|
||||||
|
os.makedirs(log_directory)
|
||||||
|
self.log_directory = log_directory # log文件保存的目录
|
||||||
|
self.sdf = '%Y-%m-%d-%H' if rotate_mode == ROTATE_MODE.HOURLY else '%Y-%m-%d'
|
||||||
|
self.suffix = datetime.datetime.now().strftime(self.sdf)
|
||||||
|
self._fileSize = log_size # 单个log文件的大小
|
||||||
|
if not self.log_directory.endswith("/"):
|
||||||
|
self.log_directory = self.log_directory + "/"
|
||||||
|
|
||||||
|
self._buffer = []
|
||||||
|
self._buffer_size = buffer_size
|
||||||
|
self._file_prefix = file_prefix
|
||||||
|
self.lock_logging_consumer()
|
||||||
|
filename = LoggingConsumer.construct_filename(self.log_directory, self.suffix, self._fileSize,
|
||||||
|
self._file_prefix)
|
||||||
|
self._writer = LoggingConsumer._FileWriter.instance(filename)
|
||||||
|
self.unlock_logging_consumer()
|
||||||
|
|
||||||
|
def add(self, msg):
|
||||||
|
messages = None
|
||||||
|
self.lock_logging_consumer()
|
||||||
|
self._buffer.append(msg)
|
||||||
|
if len(self._buffer) > self._buffer_size:
|
||||||
|
messages = self._buffer
|
||||||
|
date_suffix = datetime.datetime.now().strftime(self.sdf)
|
||||||
|
if self.suffix != date_suffix:
|
||||||
|
self.suffix = date_suffix
|
||||||
|
|
||||||
|
filename = LoggingConsumer.construct_filename(self.log_directory, self.suffix, self._fileSize,
|
||||||
|
self._file_prefix)
|
||||||
|
if not self._writer.is_valid(filename):
|
||||||
|
self._writer.close()
|
||||||
|
self._writer = LoggingConsumer._FileWriter.instance(filename)
|
||||||
|
self._buffer = []
|
||||||
|
if messages:
|
||||||
|
self._writer.write(messages)
|
||||||
|
self.unlock_logging_consumer()
|
||||||
|
|
||||||
|
def flush_with_close(self, is_close):
|
||||||
|
messages = None
|
||||||
|
self.lock_logging_consumer()
|
||||||
|
if len(self._buffer) > 0:
|
||||||
|
messages = self._buffer
|
||||||
|
filename = LoggingConsumer.construct_filename(self.log_directory, self.suffix, self._fileSize,
|
||||||
|
self._file_prefix)
|
||||||
|
if not self._writer.is_valid(filename):
|
||||||
|
self._writer.close()
|
||||||
|
self._writer = LoggingConsumer._FileWriter.instance(filename)
|
||||||
|
self._buffer = []
|
||||||
|
if messages:
|
||||||
|
self._writer.write(messages)
|
||||||
|
if is_close:
|
||||||
|
self._writer.close()
|
||||||
|
self.unlock_logging_consumer()
|
||||||
|
|
||||||
|
def flush(self):
|
||||||
|
self.flush_with_close(False)
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
self.flush_with_close(True)
|
||||||
|
|
||||||
|
|
||||||
|
class BatchConsumer(object):
|
||||||
|
"""同步、批量地向 TA 服务器传输数据
|
||||||
|
|
||||||
|
通过指定接收端地址和 APP ID,可以同步的向 TA 服务器传输数据. 此 Consumer 不需要搭配传输工具,
|
||||||
|
但是存在网络不稳定等原因造成数据丢失的可能,因此不建议在生产环境中使用.
|
||||||
|
|
||||||
|
触发上报的时机为以下条件满足其中之一的时候:
|
||||||
|
1. 数据条数大于预定义的最大值, 默认为 20 条
|
||||||
|
2. 数据发送间隔超过预定义的最大时间, 默认为 3 秒
|
||||||
|
"""
|
||||||
|
_batchlock = threading.RLock()
|
||||||
|
_cachelock = threading.RLock()
|
||||||
|
|
||||||
|
def __init__(self, server_uri, appid, batch=20, timeout=30000, interval=3, compress=True, maxCacheSize=50):
|
||||||
|
"""创建 BatchConsumer
|
||||||
|
|
||||||
|
Args:
|
||||||
|
server_uri: 服务器的 URL 地址
|
||||||
|
appid: 项目的 APP ID
|
||||||
|
batch: 指定触发上传的数据条数, 默认为 20 条, 最大 200 条
|
||||||
|
timeout: 请求的超时时间, 单位毫秒, 默认为 30000 ms
|
||||||
|
interval: 推送数据的最大时间间隔, 单位为秒, 默认为 3 秒
|
||||||
|
"""
|
||||||
|
self.__interval = interval
|
||||||
|
self.__batch = min(batch, 200)
|
||||||
|
self.__message_channel = []
|
||||||
|
self.__maxCacheSize = maxCacheSize
|
||||||
|
self.__cache_buffer = []
|
||||||
|
self.__last_flush = time.time()
|
||||||
|
server_url = urlparse(server_uri)
|
||||||
|
self.__http_service = _HttpServices(server_url._replace(path='/sync_server').geturl(), appid, timeout)
|
||||||
|
self.__http_service.compress = compress
|
||||||
|
|
||||||
|
def add(self, msg):
|
||||||
|
self._batchlock.acquire()
|
||||||
|
try:
|
||||||
|
self.__message_channel.append(msg)
|
||||||
|
finally:
|
||||||
|
self._batchlock.release()
|
||||||
|
if len(self.__message_channel) >= self.__batch \
|
||||||
|
or len(self.__cache_buffer) > 0:
|
||||||
|
self.flush_once()
|
||||||
|
|
||||||
|
def flush(self, throw_exception=True):
|
||||||
|
while len(self.__cache_buffer) > 0 or len(self.__message_channel) > 0:
|
||||||
|
try:
|
||||||
|
self.flush_once(throw_exception)
|
||||||
|
except TGAIllegalDataException:
|
||||||
|
continue
|
||||||
|
|
||||||
|
def flush_once(self, throw_exception=True):
|
||||||
|
if len(self.__message_channel) == 0 and len(self.__cache_buffer) == 0:
|
||||||
|
return
|
||||||
|
|
||||||
|
self._cachelock.acquire()
|
||||||
|
self._batchlock.acquire()
|
||||||
|
try:
|
||||||
|
try:
|
||||||
|
if len(self.__message_channel) == 0 and len(self.__cache_buffer) == 0:
|
||||||
|
return
|
||||||
|
if len(self.__cache_buffer) == 0 or len(self.__message_channel) >= self.__batch:
|
||||||
|
self.__cache_buffer.append(self.__message_channel)
|
||||||
|
self.__message_channel = []
|
||||||
|
finally:
|
||||||
|
self._batchlock.release()
|
||||||
|
msg = self.__cache_buffer[0]
|
||||||
|
self.__http_service.send('[' + ','.join(msg) + ']', str(len(msg)))
|
||||||
|
self.__last_flush = time.time()
|
||||||
|
self.__cache_buffer = self.__cache_buffer[1:]
|
||||||
|
except TGANetworkException as e:
|
||||||
|
if throw_exception:
|
||||||
|
raise e
|
||||||
|
except TGAIllegalDataException as e:
|
||||||
|
self.__cache_buffer = self.__cache_buffer[1:]
|
||||||
|
if throw_exception:
|
||||||
|
raise e
|
||||||
|
finally:
|
||||||
|
self._cachelock.release()
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
self.flush()
|
||||||
|
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class AsyncBatchConsumer(object):
|
||||||
|
"""异步、批量地向 TA 服务器发送数据的
|
||||||
|
|
||||||
|
AsyncBatchConsumer 使用独立的线程进行数据发送,当满足以下两个条件之一时触发数据上报:
|
||||||
|
1. 数据条数大于预定义的最大值, 默认为 20 条
|
||||||
|
2. 数据发送间隔超过预定义的最大时间, 默认为 3 秒
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, server_uri, appid, interval=3, flush_size=20, queue_size=100000):
|
||||||
|
"""创建 AsyncBatchConsumer
|
||||||
|
|
||||||
|
Args:
|
||||||
|
server_uri: 服务器的 URL 地址
|
||||||
|
appid: 项目的 APP ID
|
||||||
|
interval: 推送数据的最大时间间隔, 单位为秒, 默认为 3 秒
|
||||||
|
flush_size: 队列缓存的阈值,超过此值将立即进行发送
|
||||||
|
queue_size: 缓存队列的大小
|
||||||
|
"""
|
||||||
|
server_url = urlparse(server_uri)
|
||||||
|
self.__http_service = _HttpServices(server_url._replace(path='/sync_server').geturl(), appid, 30000)
|
||||||
|
self.__batch = flush_size
|
||||||
|
self.__queue = queue.Queue(queue_size)
|
||||||
|
|
||||||
|
# 初始化发送线程
|
||||||
|
self.__flushing_thread = self._AsyncFlushThread(self, interval)
|
||||||
|
self.__flushing_thread.daemon = True
|
||||||
|
self.__flushing_thread.start()
|
||||||
|
|
||||||
|
def add(self, msg):
|
||||||
|
try:
|
||||||
|
self.__queue.put_nowait(msg)
|
||||||
|
except queue.Full as e:
|
||||||
|
raise TGANetworkException(e)
|
||||||
|
|
||||||
|
if self.__queue.qsize() > self.__batch:
|
||||||
|
self.flush()
|
||||||
|
|
||||||
|
def flush(self):
|
||||||
|
self.__flushing_thread.flush()
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
self.__flushing_thread.stop()
|
||||||
|
while not self.__queue.empty():
|
||||||
|
self._perform_request()
|
||||||
|
|
||||||
|
def _perform_request(self):
|
||||||
|
"""同步的发送数据
|
||||||
|
|
||||||
|
仅用于内部调用, 用户不应当调用此方法.
|
||||||
|
"""
|
||||||
|
flush_buffer = []
|
||||||
|
while len(flush_buffer) < self.__batch:
|
||||||
|
try:
|
||||||
|
flush_buffer.append(str(self.__queue.get_nowait()))
|
||||||
|
except queue.Empty:
|
||||||
|
break
|
||||||
|
|
||||||
|
if len(flush_buffer) > 0:
|
||||||
|
for i in range(3): # 网络异常情况下重试 3 次
|
||||||
|
try:
|
||||||
|
self.__http_service.send('[' + ','.join(flush_buffer) + ']', str(len(flush_buffer)))
|
||||||
|
return True
|
||||||
|
except TGANetworkException:
|
||||||
|
pass
|
||||||
|
except TGAIllegalDataException:
|
||||||
|
break
|
||||||
|
|
||||||
|
class _AsyncFlushThread(threading.Thread):
|
||||||
|
def __init__(self, consumer, interval):
|
||||||
|
threading.Thread.__init__(self)
|
||||||
|
self._consumer = consumer
|
||||||
|
self._interval = interval
|
||||||
|
|
||||||
|
self._stop_event = threading.Event()
|
||||||
|
self._finished_event = threading.Event()
|
||||||
|
self._flush_event = threading.Event()
|
||||||
|
|
||||||
|
def flush(self):
|
||||||
|
self._flush_event.set()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""停止线程
|
||||||
|
|
||||||
|
退出时需调用此方法,以保证线程安全结束.
|
||||||
|
"""
|
||||||
|
self._stop_event.set()
|
||||||
|
self._finished_event.wait()
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
while True:
|
||||||
|
# 如果 _flush_event 标志位为 True,或者等待超过 _interval 则继续执行
|
||||||
|
self._flush_event.wait(self._interval)
|
||||||
|
self._consumer._perform_request()
|
||||||
|
self._flush_event.clear()
|
||||||
|
|
||||||
|
# 发现 stop 标志位时安全退出
|
||||||
|
if self._stop_event.isSet():
|
||||||
|
break
|
||||||
|
self._finished_event.set()
|
||||||
|
|
||||||
|
|
||||||
|
def _gzip_string(data):
|
||||||
|
try:
|
||||||
|
return gzip.compress(data)
|
||||||
|
except AttributeError:
|
||||||
|
import StringIO
|
||||||
|
buf = StringIO.StringIO()
|
||||||
|
fd = gzip.GzipFile(fileobj=buf, mode="w")
|
||||||
|
fd.write(data)
|
||||||
|
fd.close()
|
||||||
|
return buf.getvalue()
|
||||||
|
|
||||||
|
|
||||||
|
class _HttpServices(object):
|
||||||
|
"""内部类,用于发送网络请求
|
||||||
|
|
||||||
|
指定接收端地址和项目 APP ID, 实现向接收端上传数据的接口. 发送前将数据默认使用 Gzip 压缩,
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, server_uri, appid, timeout=30000):
|
||||||
|
self.url = server_uri
|
||||||
|
self.appid = appid
|
||||||
|
self.timeout = timeout
|
||||||
|
self.compress = True
|
||||||
|
|
||||||
|
def send(self, data, length):
|
||||||
|
"""使用 Requests 发送数据给服务器
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data: 待发送的数据
|
||||||
|
length
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
TGAIllegalDataException: 数据错误
|
||||||
|
TGANetworkException: 网络错误
|
||||||
|
"""
|
||||||
|
headers = {'appid': self.appid, 'TA-Integration-Type': 'python-sdk', 'TA-Integration-Version': __version__,
|
||||||
|
'TA-Integration-Count': length}
|
||||||
|
try:
|
||||||
|
compress_type = 'gzip'
|
||||||
|
if self.compress:
|
||||||
|
data = _gzip_string(data.encode("utf-8"))
|
||||||
|
else:
|
||||||
|
compress_type = 'none'
|
||||||
|
data = data.encode("utf-8")
|
||||||
|
headers['compress'] = compress_type
|
||||||
|
response = requests.post(self.url, data=data, headers=headers, timeout=self.timeout)
|
||||||
|
if response.status_code == 200:
|
||||||
|
responseData = json.loads(response.text)
|
||||||
|
if responseData["code"] == 0:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
raise TGAIllegalDataException("Unexpected result code: " + str(responseData["code"]))
|
||||||
|
else:
|
||||||
|
raise TGANetworkException("Unexpected Http status code " + str(response.status_code))
|
||||||
|
except ConnectionError as e:
|
||||||
|
time.sleep(0.5)
|
||||||
|
raise TGANetworkException("Data transmission failed due to " + repr(e))
|
||||||
|
|
||||||
|
|
||||||
|
class DebugConsumer(object):
|
||||||
|
"""逐条、同步的发送数据给接收服务器
|
||||||
|
|
||||||
|
服务端会对数据进行严格校验,当某个属性不符合规范时,整条数据都不会入库. 当数据格式错误时抛出包含详细原因的异常信息.
|
||||||
|
建议首先使用此 Consumer 来调试埋点数据.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, server_uri, appid, timeout=30000, write_data=True):
|
||||||
|
"""创建 DebugConsumer
|
||||||
|
|
||||||
|
Args:
|
||||||
|
server_uri: 服务器的 URL 地址
|
||||||
|
appid: 项目的 APP ID
|
||||||
|
timeout: 请求的超时时间, 单位毫秒, 默认为 30000 ms
|
||||||
|
"""
|
||||||
|
server_url = urlparse(server_uri)
|
||||||
|
debug_url = server_url._replace(path='/data_debug')
|
||||||
|
self.__server_uri = debug_url.geturl()
|
||||||
|
self.__appid = appid
|
||||||
|
self.__timeout = timeout
|
||||||
|
self.__writer_data = write_data
|
||||||
|
|
||||||
|
def add(self, msg):
|
||||||
|
try:
|
||||||
|
dry_run = 0
|
||||||
|
if not self.__writer_data:
|
||||||
|
dry_run = 1
|
||||||
|
response = requests.post(self.__server_uri,
|
||||||
|
data={'source': 'server', 'appid': self.__appid, 'data': msg, 'dryRun': dry_run},
|
||||||
|
timeout=self.__timeout)
|
||||||
|
if response.status_code == 200:
|
||||||
|
responseData = json.loads(response.text)
|
||||||
|
if responseData["errorLevel"] == 0:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
print("Unexpected result : \n %s" % response.text)
|
||||||
|
else:
|
||||||
|
raise TGANetworkException("Unexpected http status code: " + str(response.status_code))
|
||||||
|
except ConnectionError as e:
|
||||||
|
time.sleep(0.5)
|
||||||
|
raise TGANetworkException("Data transmission failed due to " + repr(e))
|
||||||
|
|
||||||
|
def flush(self, throw_exception=True):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class ToKafka(object):
|
||||||
|
"""
|
||||||
|
将数据发送到kafka
|
||||||
|
注意 减少不必要的查询 分区固定设置16个
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, conf):
|
||||||
|
self.__topic_name = None
|
||||||
|
self.__producer = KafkaProducer(**conf)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def topic_name(self):
|
||||||
|
return self.__topic_name
|
||||||
|
|
||||||
|
@topic_name.setter
|
||||||
|
def topic_name(self, topic_name):
|
||||||
|
self.__topic_name = topic_name
|
||||||
|
# self.__producer.partitions_for(topic_name)
|
||||||
|
|
||||||
|
def add(self, msg):
|
||||||
|
try:
|
||||||
|
self.__producer.send(self.__topic_name, msg, partition=random.randint(0, 15))
|
||||||
|
except Exception as e:
|
||||||
|
print(e)
|
||||||
|
|
||||||
|
def flush(self, throw_exception=True):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
pass
|
Loading…
Reference in New Issue
Block a user